Skip to content

Commit

Permalink
update aggregator (#792)
Browse files Browse the repository at this point in the history
  • Loading branch information
Liuhaai authored Dec 31, 2024
1 parent 1145b0b commit eef881a
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 45 deletions.
33 changes: 18 additions & 15 deletions datasource/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,33 +22,36 @@ func (p *Clickhouse) Retrieve(taskIDs []common.Hash) ([]*task.Task, error) {
if len(taskIDs) == 0 {
return nil, errors.New("empty query task ids")
}
tids := make([]string, 0, len(taskIDs))
taskIDsHex := make([]string, 0, len(taskIDs))
for _, t := range taskIDs {
tids = append(tids, t.Hex())
taskIDsHex = append(taskIDsHex, t.Hex())
}
var ts []db.Task
if err := p.db.Select(context.Background(), &ts, "SELECT * FROM w3bstream_tasks WHERE task_id IN ?", tids); err != nil {
if err := p.db.Select(context.Background(), &ts, "SELECT * FROM w3bstream_tasks WHERE task_id IN ?", taskIDsHex); err != nil {
return nil, errors.Wrap(err, "failed to query tasks")
}

prevTaskIDs := map[string]bool{}
// filter out prev task that has been fetched
prevTasksPool := map[string]db.Task{}
for i := range ts {
prevTasksPool[ts[i].TaskID] = ts[i]
}
fetchPrevTaskIDs := make([]string, 0)
for i := range ts {
if ts[i].PrevTaskID != "" {
prevTaskIDs[ts[i].PrevTaskID] = true
if _, exist := prevTasksPool[ts[i].PrevTaskID]; !exist {
fetchPrevTaskIDs = append(fetchPrevTaskIDs, ts[i].PrevTaskID)
}
}
}
ptids := make([]string, 0, len(prevTaskIDs))
for t := range prevTaskIDs {
ptids = append(ptids, t)
}
pts := make(map[string]*db.Task, len(prevTaskIDs))
if len(ptids) != 0 {

if len(fetchPrevTaskIDs) != 0 {
var pdts []db.Task
if err := p.db.Select(context.Background(), &pdts, "SELECT * FROM w3bstream_tasks WHERE task_id IN ?", tids); err != nil {
if err := p.db.Select(context.Background(), &pdts, "SELECT * FROM w3bstream_tasks WHERE task_id IN ?", fetchPrevTaskIDs); err != nil {
return nil, errors.Wrap(err, "failed to query previous tasks")
}
for i := range pdts {
pts[pdts[i].TaskID] = &pdts[i]
prevTasksPool[pdts[i].TaskID] = pdts[i]
}
}

Expand All @@ -59,11 +62,11 @@ func (p *Clickhouse) Retrieve(taskIDs []common.Hash) ([]*task.Task, error) {
return nil, err
}
if ts[i].PrevTaskID != "" {
pdt, ok := pts[ts[i].PrevTaskID]
pdt, ok := prevTasksPool[ts[i].PrevTaskID]
if !ok {
return nil, errors.New("failed to get previous task")
}
pt, err := p.conv(pdt)
pt, err := p.conv(&pdt)
if err != nil {
return nil, err
}
Expand Down
14 changes: 10 additions & 4 deletions e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ func TestE2E(t *testing.T) {
dataJson, err := json.Marshal(msgData)
require.NoError(t, err)

sendMessage(t, dataJson, projectID, nil, deviceKey, apiNodeUrl)
taskid := sendMessage(t, dataJson, projectID, nil, deviceKey, apiNodeUrl)
waitSettled(t, taskid, apiNodeUrl)
})

t.Run("GNARK", func(t *testing.T) {
Expand All @@ -191,19 +192,24 @@ func TestE2E(t *testing.T) {
data, err := hex.DecodeString("00000001000000010000000200000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000001")
require.NoError(t, err)

sendMessage(t, data, projectID, nil, deviceKey, apiNodeUrl)
taskid := sendMessage(t, data, projectID, nil, deviceKey, apiNodeUrl)
waitSettled(t, taskid, apiNodeUrl)
})
}

func sendMessage(t *testing.T, dataJson []byte, projectID *big.Int,
projectConfig *project.Config, deviceKey *ecdsa.PrivateKey, apiNodeUrl string) {
projectConfig *project.Config, deviceKey *ecdsa.PrivateKey, apiNodeUrl string) string {
reqBody, err := signMesssage(dataJson, projectID.Uint64(), projectConfig, deviceKey)
require.NoError(t, err)

taskID, err := createTask(reqBody, apiNodeUrl)
require.NoError(t, err)

err = waitUntil(func() (bool, error) {
return taskID
}

func waitSettled(t *testing.T, taskID string, apiNodeUrl string) {
err := waitUntil(func() (bool, error) {
states, err := queryTask(taskID, apiNodeUrl)
if err != nil {
return false, err
Expand Down
56 changes: 33 additions & 23 deletions service/apinode/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/iotexproject/w3bstream/service/apinode/db"
apidb "github.com/iotexproject/w3bstream/service/apinode/db"
"github.com/iotexproject/w3bstream/service/sequencer/api"
"github.com/pkg/errors"
)

func Run(db *db.DB, sequencerAddr string, interval time.Duration) {
func Run(db *apidb.DB, sequencerAddr string, interval time.Duration) {
ticker := time.NewTicker(interval)
for range ticker.C {
ts, err := db.FetchAllTask()
Expand All @@ -26,39 +26,49 @@ func Run(db *db.DB, sequencerAddr string, interval time.Duration) {
if len(ts) == 0 {
continue
}
var prevTaskID string

tasksbyProject := make(map[string][]*apidb.Task)
for i := range ts {
if ts[i].ProjectID == "942" {
if prevTaskID == "" {
prevTaskID = ts[i].TaskID
}
ts[i].PrevTaskID = prevTaskID
}
tasksbyProject[ts[i].ProjectID] = append(tasksbyProject[ts[i].ProjectID], ts[i])
}
if err := db.CreateTasks(ts); err != nil {
slog.Error("failed to create tasks", "error", err)
continue

if tasks, ok := tasksbyProject["942"]; ok && len(tasks) > 0 {
prevTaskID := tasks[0].TaskID
tasks[len(tasks)-1].PrevTaskID = prevTaskID
}
time.Sleep(1 * time.Second) // after writing to clickhouse, reading immediately will not return the value.

tids := make([]string, 0, len(ts))
pt := map[string]string{}
for _, t := range ts {
tids = append(tids, t.TaskID)
pt[t.ProjectID] = t.TaskID
if err := dumpTasks(db, ts); err != nil {
slog.Error("failed to dump tasks", "error", err)
continue
}
for _, t := range pt {
if err := notify(sequencerAddr, common.HexToHash(t)); err != nil {

for _, tasks := range tasksbyProject {
if len(tasks) == 0 {
continue
}
lastTask := tasks[len(tasks)-1]
if err := notify(sequencerAddr, common.HexToHash(lastTask.TaskID)); err != nil {
slog.Error("failed to notify sequencer", "error", err)
continue
}
}
if err := db.DeleteTasks(tids); err != nil {
slog.Error("failed to delete tasks at local", "error", err)
}
}
}

func dumpTasks(db *apidb.DB, ts []*apidb.Task) error {
// add tasks to remote
if err := db.CreateTasks(ts); err != nil {
slog.Error("failed to create tasks", "error", err)
return err
}
// remove tasks from local
if err := db.DeleteTasks(ts); err != nil {
slog.Error("failed to delete tasks at local", "error", err)
return err
}
return nil
}

func notify(sequencerAddr string, taskID common.Hash) error {
reqSequencer := &api.CreateTaskReq{TaskID: taskID}
reqSequencerJ, err := json.Marshal(reqSequencer)
Expand Down
7 changes: 5 additions & 2 deletions service/apinode/db/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ func (p *DB) CreateTasks(ts []*Task) error {
return errors.Wrap(err, "failed to append struct")
}
}
err = batch.Send()
return errors.Wrap(err, "failed to create tasks")
if err := batch.Send(); err != nil {
return errors.Wrap(err, "failed to create tasks")
}
time.Sleep(1 * time.Second) // after writing to clickhouse, reading immediately will not return the value.
return nil
}

func (p *DB) FetchTask(taskID common.Hash) (*Task, error) {
Expand Down
1 change: 1 addition & 0 deletions service/apinode/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"gorm.io/gorm"
)

// TODO: remove this combinator
type DB struct {
sqlite *gorm.DB
ch driver.Conn
Expand Down
6 changes: 5 additions & 1 deletion service/apinode/db/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ func (p *DB) FetchAllTask() ([]*Task, error) {
return ts, nil
}

func (p *DB) DeleteTasks(taskIDs []string) error {
func (p *DB) DeleteTasks(ts []*Task) error {
taskIDs := make([]string, len(ts))
for i, t := range ts {
taskIDs[i] = t.TaskID
}
if len(taskIDs) == 0 {
return nil
}
Expand Down

0 comments on commit eef881a

Please sign in to comment.