Skip to content

Commit

Permalink
feat: init
Browse files Browse the repository at this point in the history
feat: add minor functions

feat: add raw reducer code, fetching code

fix: separate reducer code

feat: db migration, fetcher codes

feat: add testcodes, separate types

feat: fetcher fetch data and store in local db

feat: insert redis

fix: test hanging error fix

fix: move utility functions to utils package

fix: typo
  • Loading branch information
nick-bisonai committed Feb 22, 2024
1 parent b4caca3 commit 1d1d341
Show file tree
Hide file tree
Showing 12 changed files with 845 additions and 0 deletions.
1 change: 1 addition & 0 deletions node/migrations/000002_local_aggregates.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS local_aggregates;
5 changes: 5 additions & 0 deletions node/migrations/000002_local_aggregates.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE IF NOT EXISTS local_aggregates (
name TEXT NOT NULL UNIQUE,
value INT8 NOT NULL,
timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP NOT NULL
);
16 changes: 16 additions & 0 deletions node/pkg/db/pgsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,20 @@ func loadPgsqlConnectionString() string {
return os.Getenv("DATABASE_URL")
}

func QueryWithoutResult(ctx context.Context, queryString string, args map[string]any) error {
pool, err := GetPool(ctx)
if err != nil {
return err
}
rows, err := query(pool, queryString, args)
if err != nil {
return err
}
rows.Close()
return nil
}

// Using this raw function is highly unrecommended since rows should be manually closed
func Query(ctx context.Context, queryString string, args map[string]any) (pgx.Rows, error) {
_pool, err := GetPool(ctx)
if err != nil {
Expand Down Expand Up @@ -86,6 +100,7 @@ func queryRow[T any](pool *pgxpool.Pool, _query string, args map[string]any) (T,
if errors.Is(err, pgx.ErrNoRows) {
return result, nil
}
defer rows.Close()
return result, err
}

Expand All @@ -101,6 +116,7 @@ func queryRows[T any](pool *pgxpool.Pool, _query string, args map[string]any) ([
if errors.Is(err, pgx.ErrNoRows) {
return results, nil
}
defer rows.Close()
return results, err
}

Expand Down
136 changes: 136 additions & 0 deletions node/pkg/fetcher/fetcher.go
Original file line number Diff line number Diff line change
@@ -1 +1,137 @@
package fetcher

import (
"context"
"encoding/json"
"fmt"
"time"

"bisonai.com/orakl/node/pkg/bus"
"bisonai.com/orakl/node/pkg/db"
"bisonai.com/orakl/node/pkg/utils"
)

func NewFetcher(bus *bus.MessageBus) *Fetcher {
return &Fetcher{
Adapters: make([]AdapterDetail, 0),
Bus: bus,
}
}

func (f *Fetcher) Run(ctx context.Context) {
f.initialize(ctx)

Check failure on line 22 in node/pkg/fetcher/fetcher.go

View workflow job for this annotation

GitHub Actions / core-build

Error return value of `f.initialize` is not checked (errcheck)

ticker := time.NewTicker(2 * time.Second)

go func() {
for range ticker.C {
err := f.runAdapter(ctx)
if err != nil {
fmt.Println(err)
}
}
}()
}

func (f *Fetcher) runAdapter(ctx context.Context) error {
for _, adapter := range f.Adapters {
result, err := f.fetch(adapter)
if err != nil {
return err
}
aggregated := utils.GetFloatAvg(result)
err = f.insertPgsql(ctx, adapter.Name, aggregated)
if err != nil {
return err
}
err = f.insertRdb(ctx, adapter.Name, aggregated)
if err != nil {
return err
}
}
return nil
}

func (f *Fetcher) insertPgsql(ctx context.Context, name string, value float64) error {
err := db.QueryWithoutResult(ctx, InsertLocalAggregateQuery, map[string]any{"name": name, "value": int64(value)})
return err
}

func (f *Fetcher) insertRdb(ctx context.Context, name string, value float64) error {
key := "latestAggregate:" + name
data, err := json.Marshal(redisAggregate{Value: int64(value), Timestamp: time.Now()})
if err != nil {
return err
}
db.Set(ctx, key, string(data), time.Duration(5*time.Minute))

Check failure on line 66 in node/pkg/fetcher/fetcher.go

View workflow job for this annotation

GitHub Actions / core-build

Error return value of `db.Set` is not checked (errcheck)
return nil
}

func (f *Fetcher) fetch(adapter AdapterDetail) ([]float64, error) {
adapterFeeds := adapter.Feeds

data := []float64{}

for _, feed := range adapterFeeds {
definition := new(Definition)
err := json.Unmarshal(feed.Definition, &definition)
if err != nil {
fmt.Println(err)
continue
}
res, err := utils.GetRequest[interface{}](definition.Url, nil, definition.Headers)
if err != nil {
fmt.Println(err)
continue
}

result, err := utils.Reduce(res, definition.Reducers)
if err != nil {
fmt.Println(err)
continue
}

data = append(data, result)
}
if len(data) < 1 {
return nil, fmt.Errorf("no data fetched")
}
return data, nil
}

func (f *Fetcher) getAdapters(ctx context.Context) ([]Adapter, error) {
adapters, err := db.QueryRows[Adapter](ctx, SelectActiveAdaptersQuery, nil)
if err != nil {
return nil, err
}
return adapters, err
}

func (f *Fetcher) getFeeds(ctx context.Context, adapterId int64) ([]Feed, error) {
feeds, err := db.QueryRows[Feed](ctx, SelectFeedsByAdapterIdQuery, map[string]any{"adapterId": adapterId})
if err != nil {
return nil, err
}

return feeds, err
}

func (f *Fetcher) initialize(ctx context.Context) error {
adapters, err := f.getAdapters(ctx)
if err != nil {
return err
}
f.Adapters = make([]AdapterDetail, 0, len(adapters))
for _, adapter := range adapters {
feeds, err := f.getFeeds(ctx, adapter.ID)
if err != nil {
return err
}
f.Adapters = append(f.Adapters, AdapterDetail{adapter, feeds})
}
return nil
}

func (f *Fetcher) String() string {
return fmt.Sprintf("%+v\n", f.Adapters)
}
106 changes: 106 additions & 0 deletions node/pkg/fetcher/fetcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package fetcher

import (
"context"
"encoding/json"
"testing"

"bisonai.com/orakl/node/pkg/bus"
"bisonai.com/orakl/node/pkg/db"
"github.com/stretchr/testify/assert"
)

func TestFetcherInitialize(t *testing.T) {
admin, err := setup()
if err != nil {
t.Fatalf("error setting up admin: %v", err)
}
err = insertSampleData(admin, context.Background())
if err != nil {
t.Fatalf("error inserting sample data: %v", err)
}
defer admin.Shutdown()

Check failure on line 22 in node/pkg/fetcher/fetcher_test.go

View workflow job for this annotation

GitHub Actions / core-build

Error return value of `admin.Shutdown` is not checked (errcheck)
defer cleanupSampleData(admin, context.Background())

Check failure on line 23 in node/pkg/fetcher/fetcher_test.go

View workflow job for this annotation

GitHub Actions / core-build

Error return value is not checked (errcheck)

b := bus.NewMessageBus()
fetcher := NewFetcher(b)
fetcher.initialize(context.Background())

Check failure on line 27 in node/pkg/fetcher/fetcher_test.go

View workflow job for this annotation

GitHub Actions / core-build

Error return value of `fetcher.initialize` is not checked (errcheck)
assert.Greater(t, len(fetcher.Adapters), 0)
assert.Greater(t, len(fetcher.Adapters[0].Feeds), 0)
}

func TestFetcherFetch(t *testing.T) {
admin, err := setup()
if err != nil {
t.Fatalf("error setting up admin: %v", err)
}
err = insertSampleData(admin, context.Background())
if err != nil {
t.Fatalf("error inserting sample data: %v", err)
}
defer admin.Shutdown()

Check failure on line 41 in node/pkg/fetcher/fetcher_test.go

View workflow job for this annotation

GitHub Actions / core-build

Error return value of `admin.Shutdown` is not checked (errcheck)
defer cleanupSampleData(admin, context.Background())

Check failure on line 42 in node/pkg/fetcher/fetcher_test.go

View workflow job for this annotation

GitHub Actions / core-build

Error return value is not checked (errcheck)

b := bus.NewMessageBus()
fetcher := NewFetcher(b)
fetcher.initialize(context.Background())

Check failure on line 46 in node/pkg/fetcher/fetcher_test.go

View workflow job for this annotation

GitHub Actions / core-build

Error return value of `fetcher.initialize` is not checked (errcheck)
result, err := fetcher.fetch(fetcher.Adapters[0])
if err != nil {
t.Fatalf("error fetching: %v", err)
}
assert.Greater(t, len(result), 0)
}

func TestFetcherRunAdapter(t *testing.T) {
admin, err := setup()
if err != nil {
t.Fatalf("error setting up admin: %v", err)
}
err = insertSampleData(admin, context.Background())
if err != nil {
t.Fatalf("error inserting sample data: %v", err)
}
defer admin.Shutdown()

Check failure on line 63 in node/pkg/fetcher/fetcher_test.go

View workflow job for this annotation

GitHub Actions / core-build

Error return value of `admin.Shutdown` is not checked (errcheck)
defer cleanupSampleData(admin, context.Background())

Check failure on line 64 in node/pkg/fetcher/fetcher_test.go

View workflow job for this annotation

GitHub Actions / core-build

Error return value is not checked (errcheck)

b := bus.NewMessageBus()
fetcher := NewFetcher(b)
fetcher.initialize(context.Background())
err = fetcher.runAdapter(context.Background())
if err != nil {
t.Fatalf("error running adapter: %v", err)
}

// read aggregate from db
pgResult, err := db.QueryRow[Aggregate](context.Background(), "SELECT * FROM local_aggregates WHERE name = @name", map[string]any{"name": fetcher.Adapters[0].Name})
if err != nil {
t.Fatalf("error reading from db: %v", err)
}
assert.NotNil(t, pgResult)

// cleanup aggregate from db
err = db.QueryWithoutResult(context.Background(), "DELETE FROM local_aggregates WHERE name = @name", map[string]any{"name": fetcher.Adapters[0].Name})
if err != nil {
t.Fatalf("error cleaning up from db: %v", err)
}

// read aggregate from redis
rdbResult, err := db.Get(context.Background(), "latestAggregate:"+fetcher.Adapters[0].Name)
if err != nil {
t.Fatalf("error reading from redis: %v", err)
}
assert.NotNil(t, rdbResult)
var redisAgg redisAggregate
err = json.Unmarshal([]byte(rdbResult), &redisAgg)
if err != nil {
t.Fatalf("error unmarshalling from redis: %v", err)
}
assert.NotNil(t, redisAgg)
assert.NotNil(t, redisAgg.Value)

// remove aggregate from redis
err = db.Del(context.Background(), "latestAggregate:"+fetcher.Adapters[0].Name)
if err != nil {
t.Fatalf("error removing from redis: %v", err)
}
}
Loading

0 comments on commit 1d1d341

Please sign in to comment.