Skip to content

Commit

Permalink
geonet mock reward
Browse files Browse the repository at this point in the history
  • Loading branch information
huangzhiran committed Dec 23, 2024
1 parent c65b866 commit f69d7fc
Show file tree
Hide file tree
Showing 10 changed files with 298 additions and 20 deletions.
8 changes: 7 additions & 1 deletion cmd/apinode/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os/signal"
"syscall"

"github.com/ethereum/go-ethereum/crypto"
"github.com/pkg/errors"

"github.com/iotexproject/w3bstream/service/apinode"
Expand All @@ -27,7 +28,12 @@ func main() {
log.Fatal(err)
}

apinode := apinode.NewAPINode(cfg, db)
prv, err := crypto.HexToECDSA(cfg.PrvKey)
if err != nil {
log.Fatal(errors.Wrap(err, "failed to parse private key"))
}

apinode := apinode.NewAPINode(cfg, db, prv)

if err := apinode.Start(); err != nil {
log.Fatal(err)
Expand Down
9 changes: 7 additions & 2 deletions e2e/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,24 @@ func apiNodeInit(chDSN, dbFile, chainEndpoint, taskManagerContractAddr, ioidCont
TaskAggregatorIntervalSecond: 1,
ProverServiceEndpoint: "localhost:9002",
DatabaseDSN: chDSN,
PrvKey: "",
PrvKey: "dbfe03b0406549232b8dccc04be8224fcc0afa300a33d4f335dcfdfead861c85",
ChainEndpoint: chainEndpoint,
BeginningBlockNumber: 0,
TaskManagerContractAddr: taskManagerContractAddr,
IoIDContractAddr: ioidContractAddr,
}

prv, err := crypto.HexToECDSA(cfg.PrvKey)
if err != nil {
return nil, "", err
}

db, err := apinodedb.New(dbFile, chDSN)
if err != nil {
return nil, "", err
}

node := apinode.NewAPINode(&cfg, db)
node := apinode.NewAPINode(&cfg, db, prv)
return node, fmt.Sprintf("http://localhost%s", cfg.ServiceEndpoint), nil
}

Expand Down
7 changes: 1 addition & 6 deletions monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,7 @@ func (c *contract) watch(listedBlockNumber uint64) {
}()
}

func Run(h *Handler, addr *ContractAddr, beginningBlockNumber uint64, chainEndpoint string) error {
client, err := ethclient.Dial(chainEndpoint)
if err != nil {
return errors.Wrap(err, "failed to dial chain endpoint")
}

func Run(h *Handler, addr *ContractAddr, beginningBlockNumber uint64, client *ethclient.Client) error {
c := &contract{
h: h,
addr: addr,
Expand Down
33 changes: 27 additions & 6 deletions service/apinode/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,26 @@ package aggregator

import (
"bytes"
"crypto/ecdsa"
"encoding/json"
"fmt"
"io"
"log/slog"
"math/big"
"net/http"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/iotexproject/w3bstream/service/apinode/db"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
adb "github.com/iotexproject/w3bstream/service/apinode/db"
"github.com/iotexproject/w3bstream/service/sequencer/api"
"github.com/iotexproject/w3bstream/smartcontracts/go/geonet"
"github.com/pkg/errors"
)

func Run(db *db.DB, sequencerAddr string, interval time.Duration) {
func Run(db *adb.DB, sequencerAddr string, interval time.Duration, geoInstance *geonet.Geonet, startGeo bool, prv *ecdsa.PrivateKey, chainID *big.Int) {
ticker := time.NewTicker(interval)
for range ticker.C {
ts, err := db.FetchAllTask()
Expand All @@ -33,13 +39,28 @@ func Run(db *db.DB, sequencerAddr string, interval time.Duration) {
time.Sleep(1 * time.Second) // after writing to clickhouse, reading immediately will not return the value.

tids := make([]string, 0, len(ts))
pt := map[string]string{}
pt := make(map[string]*adb.Task)
for _, t := range ts {
tids = append(tids, t.TaskID)
pt[t.ProjectID] = t.TaskID
pt[t.ProjectID] = t
}
for _, t := range pt {
if err := notify(sequencerAddr, common.HexToHash(t)); err != nil {
for pid, t := range pt {
if pid == "942" && startGeo {
tx, err := geoInstance.Tick(
&bind.TransactOpts{
From: crypto.PubkeyToAddress(prv.PublicKey),
Signer: func(a common.Address, t *types.Transaction) (*types.Transaction, error) {
return types.SignTx(t, types.NewLondonSigner(chainID), prv)
},
},
common.HexToAddress(t.DeviceID),
)
if err != nil {
slog.Error("failed to call geo contract", "error", err)
}
slog.Info("send tx to geo contract success", "hash", tx.Hash().String())
}
if err := notify(sequencerAddr, common.HexToHash(t.TaskID)); err != nil {
slog.Error("failed to notify sequencer", "error", err)
continue
}
Expand Down
24 changes: 21 additions & 3 deletions service/apinode/apinode.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,40 @@
package apinode

import (
"context"
"crypto/ecdsa"
"log"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/iotexproject/w3bstream/monitor"
"github.com/iotexproject/w3bstream/service/apinode/aggregator"
"github.com/iotexproject/w3bstream/service/apinode/api"
"github.com/iotexproject/w3bstream/service/apinode/config"
"github.com/iotexproject/w3bstream/service/apinode/db"
"github.com/iotexproject/w3bstream/smartcontracts/go/geonet"
"github.com/pkg/errors"
)

type APINode struct {
cfg *config.Config
db *db.DB
prv *ecdsa.PrivateKey
}

func NewAPINode(cfg *config.Config, db *db.DB) *APINode {
func NewAPINode(cfg *config.Config, db *db.DB, prv *ecdsa.PrivateKey) *APINode {
return &APINode{
cfg: cfg,
db: db,
}
}

func (n *APINode) Start() error {
client, err := ethclient.Dial(n.cfg.ChainEndpoint)
if err != nil {
return errors.Wrap(err, "failed to dial chain endpoint")
}
if err := monitor.Run(
&monitor.Handler{
ScannedBlockNumber: n.db.ScannedBlockNumber,
Expand All @@ -39,12 +48,21 @@ func (n *APINode) Start() error {
IoID: common.HexToAddress(n.cfg.IoIDContractAddr),
},
n.cfg.BeginningBlockNumber,
n.cfg.ChainEndpoint,
client,
); err != nil {
return errors.Wrap(err, "failed to run contract monitor")
}

go aggregator.Run(n.db, n.cfg.SequencerServiceEndpoint, time.Duration(n.cfg.TaskAggregatorIntervalSecond)*time.Second)
geoInstance, err := geonet.NewGeonet(common.HexToAddress(n.cfg.GeoContractAddr), client)
if err != nil {
return errors.Wrap(err, "failed to new geo contract instance")
}
chainID, err := client.ChainID(context.Background())
if err != nil {
return errors.Wrap(err, "failed to get chain id")
}

go aggregator.Run(n.db, n.cfg.SequencerServiceEndpoint, time.Duration(n.cfg.TaskAggregatorIntervalSecond)*time.Second, geoInstance, n.cfg.StartGeo, n.prv, chainID)

go func() {
if err := api.Run(n.db, n.cfg.ServiceEndpoint, n.cfg.SequencerServiceEndpoint, n.cfg.ProverServiceEndpoint); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions service/apinode/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type Config struct {
BeginningBlockNumber uint64 `env:"BEGINNING_BLOCK_NUMBER,optional"`
TaskManagerContractAddr string `env:"TASK_MANAGER_CONTRACT_ADDRESS,optional"`
IoIDContractAddr string `env:"IOID_CONTRACT_ADDRESS,optional"`
GeoContractAddr string `env:"GEO_CONTRACT_ADDRESS,optional"`
StartGeo bool `env:"START_GEO,optional"`
LocalDBDir string `env:"LOCAL_DB_DIRECTORY,optional"`
env string `env:"-"`
}
Expand All @@ -34,6 +36,8 @@ var defaultTestnetConfig = &Config{
BeginningBlockNumber: 28685000,
TaskManagerContractAddr: "0xF0714400a4C0C72007A9F910C5E3007614958636",
IoIDContractAddr: "0x45Ce3E6f526e597628c73B731a3e9Af7Fc32f5b7",
GeoContractAddr: "0x3DC44D11238E54f83DD2bd90c75C45D8Fd6af1b3",
StartGeo: false,
LocalDBDir: "./local_db",
env: "TESTNET",
}
Expand Down
8 changes: 7 additions & 1 deletion service/prover/prover.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/pkg/errors"

"github.com/iotexproject/w3bstream/datasource"
Expand Down Expand Up @@ -43,6 +44,11 @@ func NewProver(cfg *config.Config, db *db.DB, privateKey *ecdsa.PrivateKey) *Pro
}

func (p *Prover) Start() error {
client, err := ethclient.Dial(p.cfg.ChainEndpoint)
if err != nil {
return errors.Wrap(err, "failed to dial chain endpoint")
}

if err := monitor.Run(
&monitor.Handler{
ScannedBlockNumber: p.db.ScannedBlockNumber,
Expand All @@ -56,7 +62,7 @@ func (p *Prover) Start() error {
TaskManager: common.HexToAddress(p.cfg.TaskManagerContractAddr),
},
p.cfg.BeginningBlockNumber,
p.cfg.ChainEndpoint,
client,
); err != nil {
return errors.Wrap(err, "failed to run monitor")
}
Expand Down
8 changes: 7 additions & 1 deletion service/sequencer/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/pkg/errors"

"github.com/iotexproject/w3bstream/datasource"
Expand All @@ -31,6 +32,11 @@ func NewSequencer(cfg *config.Config, db *db.DB, prv *ecdsa.PrivateKey) *Sequenc
}

func (s *Sequencer) Start() error {
client, err := ethclient.Dial(s.cfg.ChainEndpoint)
if err != nil {
return errors.Wrap(err, "failed to dial chain endpoint")
}

if err := monitor.Run(
&monitor.Handler{
ScannedBlockNumber: s.db.ScannedBlockNumber,
Expand All @@ -44,7 +50,7 @@ func (s *Sequencer) Start() error {
TaskManager: common.HexToAddress(s.cfg.TaskManagerContractAddr),
},
s.cfg.BeginningBlockNumber,
s.cfg.ChainEndpoint,
client,
); err != nil {
return errors.Wrap(err, "failed to start monitor")
}
Expand Down
15 changes: 15 additions & 0 deletions smartcontracts/go/geonet/geonet.abi
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[
{
"inputs": [
{
"internalType": "address",
"name": "_device",
"type": "address"
}
],
"name": "tick",
"outputs": [],
"stateMutability": "nonpayable",
"type": "function"
}
]
Loading

0 comments on commit f69d7fc

Please sign in to comment.