Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add data provider #69

Open
wants to merge 24 commits into
base: main
Choose a base branch
from
Open

Add data provider #69

wants to merge 24 commits into from

Conversation

harryrackmil
Copy link
Contributor

@harryrackmil harryrackmil commented Jan 10, 2025

Change

Add a framework for reporting data from arbitrary sources to the publisher agent

Testing

Ran this and a publisher agent together via docker-compose and confirmed prices looked reasonable and the publisher agent was able to parse and sign them


func init() {
DataProviderCmd.Flags().StringP(ConfigFilePathFlag, "c", "", "the path of your config json file")
DataProviderCmd.Flags().StringP(WebsocketUrl, "w", "", "the websocket url to write updates to")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be configured in the config json if we wanted, but it felt like more of a run time configuration (I might want to test without a websocket url at first to just look at the prices)

Copy link
Contributor

@akawalsky akawalsky Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of making this a bit more generic - something like -o where the output could eventually take the form of

  1. ws interface
  2. http interface
  3. a file buffer interface

which could be denoted by ws(s)://, http(s)://, file://, s3:// etc

Only ws should need to be supported right now though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense - added a Writer interface and a GetWriter function where we can do some branching in the future. Currently it will fail if the --output-address isn't either blank or prefixed with ws://

}

func buildDataSources(config DataProviderConfig) []dataSource {
// group by data source id to support batched feeds
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overkill for our current sources (right now every dataSource is 1:1 with a value) but we might want to support batching of feeds in the future


func GetDataSourceBuilder(dataSourceId DataSourceId) func([]DataProviderSourceConfig) []dataSource {
switch dataSourceId {
case UniswapV2DataSourceId:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This switch statement is the only shared file that needs to change when we write a new integration

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a simpler example than uniswap. Still using the scheduled data source concept

go dataSource.Run(r.updatesCh)
}

r.writer.Run(r.updatesCh)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kick off all the data sources in goroutines and kick off a writer thread

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a helper to allow pulling on a regular cadence

valueUpdate := ValueUpdate{
PublishTimestamp: update.Timestamp.UnixNano(),
ValueId: update.ValueId,
Value: fmt.Sprintf(`%.18f`, update.Value),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

convert to string so we don't report very small prices using scientific notation


w.logger.Debug().Msgf("Update: %s", string(wsMessageBytes))

if conn != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a user may not configure any websocket url when developing - they can just run the app in verbose mode to see the updates logged

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure where we want this to live. Wanted it outside of the lib/data_provider directory since we're passing a config file to the docker container, not baking it into the docker container

@akawalsky
Copy link
Contributor

Im not seeing an entry point and the docker compose isnt working locally. How have you been running this?

@@ -0,0 +1,31 @@
package random
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this factory file is basically all boilerplate - it looks identical to the uniswap_v2/factory.go

}

func (f *randomDataSourceFactory) GetSchema() (*gojsonschema.Schema, error) {
return utils.LoadSchema("resources/config_schema.json", resourcesFS)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetSchema is really a function of the data source class, not a specific instance of the data source, so I'm making it a function on the factory object. Might be worth renaming the factory interface to DataSourceType or DataSourceClass or something since it's doing more than just building data sources now.

func NewScheduler(
updateFrequency time.Duration,
getUpdate func() (types.DataSourceUpdateMap, error),
handleErr func(error),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than passing a logger object to the scheduler, we pass handleErr function. In practice I expect most sources will call this with the GetErrorLogHandler() function provided in this file which just logs at a configurable level. Might make more sense to just pass a logger to the scheduler instead.

@@ -0,0 +1,12 @@
package uniswap_v2
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this file is just the config object, in case we wanted to autogenerate from json schema (or generate json schema from this)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it also be worth having a config_test to confirm that the config is deserialized as expected?

@@ -0,0 +1,153 @@
package uniswap_v2
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the data_source file has minimal boilerplate - it's just the DataSource object, which only needs to implement RunDataSource, and a constructor for the DataSource object.

@@ -0,0 +1,51 @@
package types
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

making types its own package so every other package is able to depend on it

baseRetryDelay = 500 * time.Millisecond
)

func GetEthereumContract(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pulled some of the reusable eth contract stuff out of the uniswap integrations since it's a lot of code and will probably be used in other integrations - might make sense to put it in the utils directory. Leaving in sources for now because it will only be used by data sources

@@ -0,0 +1,25 @@
import os
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generate a file which will explicitly force importing each individual data source package. Using python since it was a little less verbose than the go version of this script, but happy to convert to go if we'd like.

We sort the package names here so it should generate the same file consistently. We can add a github action which runs this script and checks to see if the result of this script is different than the currently committed imports file, and block merging if they disagree

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a world where this also adds to the json schema config at the top level to support oneOf all of the resources included using a schema pointer? If so, we could eliminate the awkwardness of having to load each config individually.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The schema pointer $ref concept doesn't work so nicely since the config schemas are spread across different embed.FS instances in different packages - we'd need to write some custom schema loader code to use pointers, meaning the schema file couldn't really be used directly since people would need to use our custom loader.

Instead, I'm using the codegen script to generate a big flat config schema file which contains all the individual config schemas. We would have needed to do some codegen for the pointer approach anyway in order to update each pointer when we add new sources - this way we have a single interpretable config schema which doesn't require any custom loading code.

}
}

func (w *WebsocketWriter) runWriteLoop(updateCh chan types.DataSourceUpdateMap) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should prob build in reconnect logic for the case where the publisher agent is restarted

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is handled already I believe - if we ever get an error when writing we'll stop the loop and return an error. This function is called inside a for loop in the Run function above so it should wait 5 seconds and then reconnect.

Copy link
Contributor

@akawalsky akawalsky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking awesome, just a few more comments/questions

@@ -49,6 +49,29 @@ jobs:
env:
TARGETPLATFORM: linux/amd64

data-provider-codegen-check:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

run the codegen script and fail if there's any git diff. Tested this by making a small change in one of the individual config_schema.json, not running the codegen script, and confirming the github action fails.

@@ -0,0 +1,138 @@
{
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

auto-generated by the update_shared_data_provider_code.py script

@@ -0,0 +1,38 @@
{
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

template file for top level config_schema generation

"github.com/Stork-Oracle/stork-external/apps/lib/data_provider/utils"
)

var RandomDataSourceId types.DataSourceId = types.DataSourceId(utils.GetCurrentDirName())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

data source id should be the same as the directory name (makes config schema generation easier)

source_config_schemas = []
data_source_ids = []

conditional_source_config_schema_template_str = """
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'll use this if/then to make sure that we're checking the appropriate schema for this data source - if we just check that the config matches any defined schema we'll wind up being as permissive as the most permissive schema.

}

with open(CONFIG_SCHEMA_TEMPLATE_FILE, 'r') as template_file:
template_json = template_file.read()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after wrapping each config schema in an if/else and concatenating them all together, fill in the config_schema.json.template file

assert.NoError(t, err)

configStr := `
{
Copy link
Contributor Author

@harryrackmil harryrackmil Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that we're not testing the top level config here (that's tested at the top level), we're specifically testing the json within the config tag, applying only to this source

@harryrackmil
Copy link
Contributor Author

Im not seeing an entry point and the docker compose isnt working locally. How have you been running this?

I committed the changes I'd made locally to the docker-compose so now you can run it from docker-compose:

docker-compose up --build data-provider


To add a new source:
1. Add a [package](../lib/data_provider/sources/random) in the [sources directory](../lib/data_provider/sources) with your data source's name
1. Run `python3 ./apps/scripts/update_shared_data_provider_code/py` to generate some framework code so that the framework is aware of your new source.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Numbers here seem to have gotten messed up somehow

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they're intentionally all 1 so that markdown interprets them as ordered numbers (might be easier to review the markdown preview for this file). This way we can reorder the steps or add/remove steps without needing to update every number

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh interesting. Is that a common practice? Not sure I've seen that before

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants