Skip to content

Commit

Permalink
Fixes after review [stagingdeploy]
Browse files Browse the repository at this point in the history
Signed-off-by: Bruno Calza <[email protected]>
  • Loading branch information
brunocalza committed Feb 14, 2023
1 parent 2fdd5e2 commit 48b51c5
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 67 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/textileio/go-tableland

go 1.18
go 1.19

require (
cloud.google.com/go/bigquery v1.45.0
Expand Down
12 changes: 6 additions & 6 deletions pkg/eventprocessor/impl/eventprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (ep *EventProcessor) executeBlock(ctx context.Context, block eventfeed.Bloc
return fmt.Errorf("calculate hash: %s", err)
}

if err := ep.calculateTreeLeaves(ctx, bs, block.BlockNumber); err != nil {
if err := ep.snapshotTreeLeaves(ctx, bs, block.BlockNumber); err != nil {
return fmt.Errorf("calculate tree leaves: %s", err)
}

Expand Down Expand Up @@ -314,7 +314,7 @@ func (ep *EventProcessor) calculateHash(ctx context.Context, bs executor.BlockSc
Int64("elapsed_time", elapsedTime).
Msg("state hash")

ep.mTreeLeavesCalculationElapsedTime.Store(elapsedTime)
ep.mHashCalculationElapsedTime.Store(elapsedTime)

if err := telemetry.Collect(ctx, telemetry.StateHashMetric{
Version: telemetry.StateHashMetricV1,
Expand All @@ -328,17 +328,17 @@ func (ep *EventProcessor) calculateHash(ctx context.Context, bs executor.BlockSc
return nil
}

func (ep *EventProcessor) calculateTreeLeaves(ctx context.Context, bs executor.BlockScope, blockNumber int64) error {
func (ep *EventProcessor) snapshotTreeLeaves(ctx context.Context, bs executor.BlockScope, blockNumber int64) error {
startTime := time.Now()
if err := bs.CalculateTreeLeaves(ctx, ep.chainID); err != nil {
return fmt.Errorf("calculate tree leaves: %s", err)
if err := bs.SnapshotTableLeaves(ctx); err != nil {
return fmt.Errorf("snapshot tree leaves: %s", err)
}
elapsedTime := time.Since(startTime).Milliseconds()
ep.log.Info().
Int64("block_number", blockNumber).
Int64("chain_id", int64(ep.chainID)).
Int64("elapsed_time", elapsedTime).
Msg("state hash")
Msg("tree leaves snapshotting")

ep.mTreeLeavesCalculationElapsedTime.Store(elapsedTime)

Expand Down
4 changes: 2 additions & 2 deletions pkg/eventprocessor/impl/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ type BlockScope interface {
// StateHash calculates the hash of some state of the database.
StateHash(ctx context.Context, chainID tableland.ChainID) (StateHash, error)

// CalculateTreeLeaves calculates the leaves for each table for future Merkle Tree building.
CalculateTreeLeaves(ctx context.Context, chainID tableland.ChainID) error
// SnapshotTableLeaves takes a snapshot of the leaves of each table for future Merkle Tree building.
SnapshotTableLeaves(ctx context.Context) error

// Commit commits all the changes that happened in previously successful ExecuteTxnEvents(...) calls.
Commit() error
Expand Down
58 changes: 41 additions & 17 deletions pkg/eventprocessor/impl/executor/impl/blockscope.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package impl

import (
"bytes"
"context"
"database/sql"
"fmt"
"hash/fnv"
"sort"
"strings"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -15,7 +18,6 @@ import (
"github.com/textileio/go-tableland/pkg/eventprocessor/eventfeed"
"github.com/textileio/go-tableland/pkg/eventprocessor/impl/executor"
"github.com/textileio/go-tableland/pkg/parsing"
"golang.org/x/crypto/sha3"
)

type blockScope struct {
Expand Down Expand Up @@ -216,13 +218,15 @@ func (bs *blockScope) StateHash(ctx context.Context, chainID tableland.ChainID)
return executor.NewStateHash(chainID, bs.scopeVars.BlockNumber, hash), nil
}

func (bs *blockScope) CalculateTreeLeaves(ctx context.Context, chainID tableland.ChainID) error {
rows, err := bs.txn.QueryContext(ctx, "select prefix, id from registry where chain_id = ?1 ORDER BY rowid", chainID)
func (bs *blockScope) SnapshotTableLeaves(ctx context.Context) error {
rows, err := bs.txn.QueryContext(ctx, "select prefix, id from registry where chain_id = ?1", bs.scopeVars.ChainID)
if err != nil {
return fmt.Errorf("fetching tables from registry: %s", err)
}
defer func() {
_ = rows.Close()
if err := rows.Close(); err != nil {
bs.log.Error().Err(err).Msg("closing the rows")
}
}()

for rows.Next() {
Expand All @@ -232,52 +236,72 @@ func (bs *blockScope) CalculateTreeLeaves(ctx context.Context, chainID tableland
return fmt.Errorf("scanning table name: %s", err)
}

if err := bs.calculateTreeLeavesForTable(ctx, chainID, tablePrefix, tableID); err != nil {
return fmt.Errorf("calculate leaves for table: %s", err)
if err := bs.snapshotTreeLeavesForTable(ctx, bs.scopeVars.ChainID, tablePrefix, tableID); err != nil {
return fmt.Errorf("snapshot leaves for table: %s", err)
}
}

if err := rows.Err(); err != nil {
return fmt.Errorf("encountered error during iteration: %s", err)
}

return nil
}

func (bs *blockScope) calculateTreeLeavesForTable(
func (bs *blockScope) snapshotTreeLeavesForTable(
ctx context.Context,
chainID tableland.ChainID,
tablePrefix string,
tableID int,
) error {
tableName := fmt.Sprintf("%s_%d_%d", tablePrefix, chainID, tableID)

// we don't need to sort the rows here because they will be sorted later inside the Merkle Tree library
tableRows, err := bs.txn.QueryContext(ctx, fmt.Sprintf("SELECT * FROM %s", tableName))
if err != nil {
return fmt.Errorf("fetching rows from %s: %s", tableName, err)
}
defer func() {
_ = tableRows.Close()
if err := tableRows.Close(); err != nil {
bs.log.Error().Err(err).Msg("closing the rows")
}
}()

cols, err := tableRows.Columns()
columns, err := tableRows.Columns()
if err != nil {
return fmt.Errorf("getting the columns of row: %s", err)
}

rawBuffer := make([]sql.RawBytes, len(cols))
scanCallArgs := make([]interface{}, len(rawBuffer))
for i := range rawBuffer {
scanCallArgs[i] = &rawBuffer[i]
columnValues := make([]sql.RawBytes, len(columns))
args := make([]interface{}, len(columnValues))
for i := range columnValues {
args[i] = &columnValues[i]
}

leaves := []byte{}
// using a non-cryptographic hash that outputs a hash of 16 bytes
rowHash := fnv.New128a()
for tableRows.Next() {
if err := tableRows.Scan(scanCallArgs...); err != nil {
if err := tableRows.Scan(args...); err != nil {
return fmt.Errorf("table row scan: %s", err)
}

rowHash := sha3.New256()
for _, col := range rawBuffer {
// We sort the column values to have a deterministic order of columns,
// because we cannot trust the order of 'SELECT *'.
sort.Slice(columnValues, func(i, j int) bool {
return bytes.Compare(columnValues[i], columnValues[j]) == -1
})

for _, col := range columnValues {
rowHash.Write(col)
}

leaves = append(leaves, rowHash.Sum(nil)...)
rowHash.Reset()
}

if err := tableRows.Err(); err != nil {
return fmt.Errorf("encountered error during iteration: %s", err)
}

if _, err := bs.txn.ExecContext(ctx,
Expand All @@ -288,7 +312,7 @@ func (bs *blockScope) calculateTreeLeavesForTable(
bs.scopeVars.BlockNumber,
leaves,
); err != nil {
return fmt.Errorf("inserting leaves %s: %s", tableName, err)
return fmt.Errorf("inserting tree leaves %s: %s", tableName, err)
}

return nil
Expand Down
50 changes: 30 additions & 20 deletions pkg/merkletree/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,44 @@ This package implements a Merkle Tree. It is used to calculate the Merkle Root o
## Design characteristics

- It was designed to work with [OpenZepellin MerkeProof verifier](https://github.com/OpenZeppelin/openzeppelin-contracts/blob/260e082ed10e86e5870c4e5859750a8271eeb2b9/contracts/utils/cryptography/MerkleProof.sol#L27-L29). That requires leaves and hash pairs to be sorted. The sorting simplifies the proof verification by removing the need to include information about the order of the proof piece.
- It duplicates the last leaf node in case the number of leaves is odd. This is a common approach but vunerable to a forgery attack because two trees can produce the same Merkle Root, e.g. `MerkleRoot(a, b, c) = MerkleRoot(a, b, c, c)`. Not sure that will be a problem in our usecase.
- It duplicates the last leaf node in case the number of leaves is odd. This is a common approach but vunerable to a forgery attack because two trees can produce the same Merkle Root, e.g. `MerkleRoot(a, b, c) = MerkleRoot(a, b, c, c)`. But that is not a problem in our use case.

## Usage

```go
leaves := [][]byte{}
leaves = append(leaves, []byte("A"))
leaves = append(leaves, []byte("B"))
leaves = append(leaves, []byte("C"))
leaves = append(leaves, []byte("D"))
leaves = append(leaves, []byte("E"))
leaves = append(leaves, []byte("F"))

tree, _ := merkletree.NewTree(leaves, crypto.Keccak256)
fmt.Printf("ROOT: %s\n\n", hex.EncodeToString(tree.MerkleRoot()))

proof := tree.GetProof(crypto.Keccak256([]byte("D")))
for i, part := range proof {
fmt.Printf("PROOF (%d): 0x%s\n", i, hex.EncodeToString(part))
}
leaves := [][]byte{}
leaves = append(leaves, []byte("A"))
leaves = append(leaves, []byte("B"))
leaves = append(leaves, []byte("C"))
leaves = append(leaves, []byte("D"))
leaves = append(leaves, []byte("E"))
leaves = append(leaves, []byte("F"))
tree, _ := merkletree.NewTree(leaves, crypto.Keccak256)

// Getting the root
root := tree.MerkleRoot()
fmt.Printf("ROOT: %s\n\n", hex.EncodeToString(root))

// Getting the proof for a given leaf
leaf := crypto.Keccak256([]byte("D"))
proof := tree.GetProof(leaf)
for i, part := range proof {
fmt.Printf("PROOF (%d): 0x%s\n", i, hex.EncodeToString(part))
}

// Verifying the proof
ok := merkletree.VerifyProof(root, proof, leaf, crypto.Keccak256)
fmt.Printf("\nVERIFICATION RESULT: %t\n", ok)
```

```bash
ROOT: 8550ee72696375131758acb925f2aac02f94a06c7f796d9560df9aeb72f222d1
ROOT: 5b4f920caf9a50816be944fd3626945ebaed5fcd1f041fa864027d4eaad29cf6

PROOF (0): 0x017e667f4b8c174291d1543c466717566e206df1bfd6f30271055ddafdb18f72
PROOF (1): 0x69de756fea16daddbbdccf85c849315f51c0b50d111e3d2063cab451803324a0
PROOF (2): 0x7f61b8bf6780bd017acc22aebf6e14d93aaca4d8b7d0b8fdbb22de1d8cc08126
PROOF (0): 0xe61d9a3d3848fb2cdd9a2ab61e2f21a10ea431275aed628a0557f9dee697c37a
PROOF (1): 0x324d51074ba12c3b56f59e6a9dd606351316426b7f7d924b1fc9efa7f261b476
PROOF (2): 0xd8c26fda8cf7503459d00730efe60ff9ec19bf97b7a26b6aa42fa8d8337efe78

VERIFICATION RESULT: true
```

## Things to explore
Expand Down
121 changes: 121 additions & 0 deletions pkg/merkletree/publisher/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package publisher

import (
"context"
"fmt"
"sync"
"time"

logger "github.com/rs/zerolog/log"
"github.com/textileio/go-tableland/pkg/merkletree"
"github.com/textileio/go-tableland/pkg/wallet"
)

// MerkleRootPublisher is responsible for building Merkle Tree and publishing the root.
type MerkleRootPublisher struct {
store LeavesStore // where leaves are stored
registry MerkleRootRegistry // where root will be published

wallet *wallet.Wallet
interval time.Duration
fetchAmount int

quitOnce sync.Once
quit chan struct{}
}

// NewMerkleRootPublisher creates a new publisher.
func NewMerkleRootPublisher(s MerkleRootRegistry, wallet *wallet.Wallet, interval time.Duration) *MerkleRootPublisher {
return &MerkleRootPublisher{
wallet: wallet,
interval: interval,
fetchAmount: 100,
quit: make(chan struct{}),
}
}

var log = logger.With().
Str("component", "merkletreepublisher").
Logger()

// Start starts the publisher.
func (p *MerkleRootPublisher) Start() {
ctx := context.Background()

ticker := time.NewTicker(p.interval)
go func() {
for {
select {
case <-ticker.C:
if err := p.publish(ctx); err != nil {
log.Err(err).Msg("failed to publish merkle root")
}
case <-p.quit:
log.Info().Msg("quiting merkle root publisher")
ticker.Stop()
return
}
}
}()
}

// Close closes the published goroutine.
func (p *MerkleRootPublisher) Close() {
p.quitOnce.Do(func() {
p.quit <- struct{}{}
close(p.quit)
})
}

func (p *MerkleRootPublisher) publish(ctx context.Context) error {
tables, err := p.store.FetchLatest(ctx)
if err != nil {
return fmt.Errorf("fetch unpublished metrics: %s", err)
}

if len(tables) == 0 {
return nil
}

for _, table := range tables {
chunks := chunker(table.Leaves, 32)

tree, err := merkletree.NewTree(chunks, nil)
if err != nil {
return err
}

if err := p.registry.Publish(ctx, table.TableID, table.HeightID, tree.MerkleRoot()); err != nil {
return err
}
}

return nil
}

type TreeLeaves struct {
Leaves []byte
ChainID int64
TableID int64
HeightID int64
TablePrefix string
}

// LeavesStore defines the API for fetching leaves from trees that need to be buil
type LeavesStore interface {
FetchLatest(context.Context) ([]TreeLeaves, error)
}

// MerkleRootRegistry defines the API for publishing root.
type MerkleRootRegistry interface {
Publish(context.Context, int64, int64, []byte) error
}

func chunker(data []byte, size int) [][]byte {
chunks := make([][]byte, len(data)/size)
for i := 0; i < len(data); i += size {
chunks[i/size] = data[i : i+size]
}

return chunks
}
Loading

0 comments on commit 48b51c5

Please sign in to comment.