Skip to content

Commit

Permalink
Implements a merkle tree root publisher [stagingdeploy]
Browse files Browse the repository at this point in the history
Signed-off-by: Bruno Calza <[email protected]>

Signed-off-by: Bruno Calza <[email protected]>
  • Loading branch information
brunocalza committed Feb 24, 2023
1 parent 1950c27 commit 347f5bb
Show file tree
Hide file tree
Showing 23 changed files with 726 additions and 16 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,5 @@ run:
timeout: 30m

skip-dirs:
- "pkg/sqlstore/impl/system/internal/db"
- "pkg/sqlstore/impl/system/db"
- "internal/router/controllers/apiv1"
35 changes: 35 additions & 0 deletions cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ import (
"github.com/textileio/go-tableland/pkg/telemetry/storage"
"github.com/textileio/go-tableland/pkg/wallet"
"go.opentelemetry.io/otel/attribute"

merklepublisher "github.com/textileio/go-tableland/pkg/merkletree/publisher"
merklepublisherimpl "github.com/textileio/go-tableland/pkg/merkletree/publisher/impl"
)

type moduleCloser func(ctx context.Context) error
Expand All @@ -70,6 +73,11 @@ func main() {
path.Join(dirPath, "database.db"),
)

db, err := sqlstoreimpl.NewSQLiteDB(databaseURL)
if err != nil {
log.Fatal().Err(err).Msg("configuring telemetry")
}

// Restore provided backup (if configured).
if config.BootstrapBackupURL != "" {
if err := restoreBackup(databaseURL, config.BootstrapBackupURL); err != nil {
Expand Down Expand Up @@ -119,6 +127,9 @@ func main() {
}
}

// Merkle Tree publisher.
closeMerkleTreePublisherModule := configureMerkleTreePublisher(db)

// Telemetry
closeTelemetryModule, err := configureTelemetry(dirPath, chainStacks, config.TelemetryPublisher)
if err != nil {
Expand Down Expand Up @@ -152,6 +163,15 @@ func main() {
log.Error().Err(err).Msg("closing user store")
}

if err := db.Close(); err != nil {
log.Error().Err(err).Msg("closing sqlite db")
}

// Close merkle tree publisher.
if err := closeMerkleTreePublisherModule(ctx); err != nil {
log.Error().Err(err).Msg("closing merkle tree publisher module")
}

// Close telemetry.
if err := closeTelemetryModule(ctx); err != nil {
log.Error().Err(err).Msg("closing telemetry module")
Expand Down Expand Up @@ -288,6 +308,21 @@ func createChainIDStack(
}, nil
}

func configureMerkleTreePublisher(db *sqlstoreimpl.SQLiteDB) moduleCloser {
store := merklepublisherimpl.NewLeavesStore(db)
p := merklepublisher.NewMerkleRootPublisher(
store,
merklepublisherimpl.NewMerkleRootRegistryLogger(log.Logger),
time.Second,
)
p.Start()

return func(_ context.Context) error {
p.Close()
return nil
}
}

func configureTelemetry(
dirPath string,
chainStacks map[tableland.ChainID]chains.ChainStack,
Expand Down
9 changes: 7 additions & 2 deletions pkg/eventprocessor/impl/executor/impl/blockscope.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (bs *blockScope) SnapshotTableLeaves(ctx context.Context) error {

for rows.Next() {
var tablePrefix string
var tableID int
var tableID int64
if err := rows.Scan(&tablePrefix, &tableID); err != nil {
return fmt.Errorf("scanning table name: %s", err)
}
Expand All @@ -252,7 +252,7 @@ func (bs *blockScope) snapshotTreeLeavesForTable(
ctx context.Context,
chainID tableland.ChainID,
tablePrefix string,
tableID int,
tableID int64,
) error {
tableName := fmt.Sprintf("%s_%d_%d", tablePrefix, chainID, tableID)

Expand Down Expand Up @@ -304,6 +304,11 @@ func (bs *blockScope) snapshotTreeLeavesForTable(
return fmt.Errorf("encountered error during iteration: %s", err)
}

if len(leaves) == 0 {
bs.log.Warn().Int64("chain_id", int64(bs.scopeVars.ChainID)).Int64("table_id", tableID).Msg("empty row")
return nil
}

if _, err := bs.txn.ExecContext(ctx,
"INSERT INTO system_tree_leaves (prefix, chain_id, table_id, block_number, leaves) VALUES (?1, ?2, ?3, ?4, ?5)",
tablePrefix,
Expand Down
90 changes: 90 additions & 0 deletions pkg/merkletree/publisher/impl/leaves_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package impl

import (
"context"
"fmt"

"github.com/rs/zerolog"

"github.com/textileio/go-tableland/pkg/merkletree/publisher"
"github.com/textileio/go-tableland/pkg/sqlstore/impl"
"github.com/textileio/go-tableland/pkg/sqlstore/impl/system/db"
)

// LeavesStore responsible for interacting with system_tree_leaves table.
type LeavesStore struct {
log zerolog.Logger
db *impl.SQLiteDB
}

// NewLeavesStore returns a new LeavesStore backed by database/sql.
func NewLeavesStore(sqlite *impl.SQLiteDB) *LeavesStore {
log := sqlite.Log.With().
Str("component", "leavesstore").
Logger()

leavesstore := &LeavesStore{
log: log,
db: sqlite,
}

return leavesstore
}

// FetchLeavesByChainIDAndBlockNumber fetches chain ids and block numbers to be processed.
func (s *LeavesStore) FetchLeavesByChainIDAndBlockNumber(
ctx context.Context,
chainID int64,
blockNumber int64,
) ([]publisher.TreeLeaves, error) {
rows, err := s.db.Queries.FetchLeavesByChainIDAndBlockNumber(ctx, db.FetchLeavesByChainIDAndBlockNumberParams{
ChainID: chainID,
BlockNumber: blockNumber,
})
if err != nil {
return []publisher.TreeLeaves{}, fmt.Errorf("fetching leaves by chain id and block number: %s", err)
}

leaves := make([]publisher.TreeLeaves, len(rows))
for i, row := range rows {
leaves[i] = publisher.TreeLeaves{
ChainID: row.ChainID,
BlockNumber: row.BlockNumber,
TableID: row.TableID,
TablePrefix: row.Prefix,
Leaves: row.Leaves,
}
}

return leaves, nil
}

// FetchChainIDAndBlockNumber fetches rows from leaves store by chain id and block number.
func (s *LeavesStore) FetchChainIDAndBlockNumber(ctx context.Context) ([]publisher.ChainIDBlockNumberPair, error) {
rows, err := s.db.Queries.FetchChainIDAndBlockNumber(ctx)
if err != nil {
return []publisher.ChainIDBlockNumberPair{}, fmt.Errorf("fetching chain id and block number: %s", err)
}

pairs := make([]publisher.ChainIDBlockNumberPair, len(rows))
for i, row := range rows {
pairs[i] = publisher.ChainIDBlockNumberPair{
ChainID: row.ChainID,
BlockNumber: row.BlockNumber,
}
}

return pairs, nil
}

// DeleteProcessing deletes rows that are marked as processing.
func (s *LeavesStore) DeleteProcessing(ctx context.Context, chainID int64, blockNumber int64) error {
if err := s.db.Queries.DeleteProcessing(ctx, db.DeleteProcessingParams{
ChainID: chainID,
BlockNumber: blockNumber,
}); err != nil {
return fmt.Errorf("delete processing: %s", err)
}

return nil
}
142 changes: 142 additions & 0 deletions pkg/merkletree/publisher/impl/publisher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package impl

import (
"bytes"
"encoding/json"
"sync"
"testing"
"time"

"github.com/rs/zerolog"
"github.com/stretchr/testify/require"
"github.com/textileio/go-tableland/pkg/merkletree/publisher"
"github.com/textileio/go-tableland/pkg/sqlstore/impl"
"github.com/textileio/go-tableland/tests"
)

func TestPublisher(t *testing.T) {
// We are going to pass this logger to MerkleRootRegistryLogger.
// It will fill the `buf` with logged bytes, that later we can inspect that it logged the expected values.
var buf buffer
logger := zerolog.New(&buf).With().Timestamp().Logger()

helper := setup(t, []publisher.TreeLeaves{
{
TablePrefix: "",
ChainID: 1,
TableID: 1,
BlockNumber: 1,
Leaves: []byte("ABCDEFGHABCDEFGH"),
},
{
TablePrefix: "",
ChainID: 1,
TableID: 2,
BlockNumber: 1,
Leaves: []byte("ABCDEFGHABCDEFGH"),
},
})

p := publisher.NewMerkleRootPublisher(helper.store, NewMerkleRootRegistryLogger(logger), time.Second)
p.Start()
defer p.Close()

type l struct {
ChainID int `json:"chain_id"`
BlockNumber int `json:"block_number"`
Level string `json:"level"`
Message string `json:"message"`
Root1 string `json:"root_1"`
Root2 string `json:"root_2"`
Tables []int `json:"tables"`
}

// Eventually the MerkleRootLogger will build the tree and emit the expected log.
require.Eventually(t, func() bool {
// We're going to inspect `buf`.
if buf.Len() != 0 {
expLog := &l{}
decoder := json.NewDecoder(bytes.NewReader(buf.Bytes()))
require.NoError(t, decoder.Decode(expLog))

require.Equal(t, 1, expLog.ChainID)
require.Equal(t, 1, expLog.BlockNumber)
require.Equal(t, "info", expLog.Level)
require.Equal(t, "merkle roots", expLog.Message)
require.Equal(t, "8b8e53316fb13d0bfe0e559e947f729af5296981a47095be51054afae8e48ab1", expLog.Root1)
require.Equal(t, "8b8e53316fb13d0bfe0e559e947f729af5296981a47095be51054afae8e48ab1", expLog.Root2)
require.Equal(t, []int{1, 2}, expLog.Tables)

helper.assertTreeLeavesIsEmpty(t)
}
return buf.Len() != 0
}, 10*time.Second, time.Second)
}

func setup(t *testing.T, data []publisher.TreeLeaves) *helper {
t.Helper()

sqlite, err := impl.NewSQLiteDB(tests.Sqlite3URI(t))
require.NoError(t, err)

// pre populate system_tree_leaves with provided data
for _, treeLeaves := range data {
_, err = sqlite.DB.Exec(
"INSERT INTO system_tree_leaves (prefix, chain_id, table_id, block_number, leaves) VALUES (?1, ?2, ?3, ?4, ?5)",
treeLeaves.TablePrefix,
treeLeaves.ChainID,
treeLeaves.TableID,
treeLeaves.BlockNumber,
treeLeaves.Leaves,
)
require.NoError(t, err)
}

return &helper{
db: sqlite,
store: NewLeavesStore(sqlite),
}
}

type helper struct {
db *impl.SQLiteDB
store *LeavesStore
}

func (h *helper) assertTreeLeavesIsEmpty(t *testing.T) {
var count int
err := h.db.DB.QueryRow("SELECT count(1) FROM system_tree_leaves").Scan(&count)
require.NoError(t, err)
require.Equal(t, 0, count)
}

// We need a thread-safe version of bytes.Buffer to avoid data races in this test.
// The reason for that is because there's a thread writing to the buffer and another one reading from it.
type buffer struct {
b bytes.Buffer
m sync.Mutex
}

func (b *buffer) Read(p []byte) (n int, err error) {
b.m.Lock()
defer b.m.Unlock()
return b.b.Read(p)
}

func (b *buffer) Write(p []byte) (n int, err error) {
b.m.Lock()
defer b.m.Unlock()
return b.b.Write(p)
}

func (b *buffer) Bytes() []byte {
b.m.Lock()
defer b.m.Unlock()
return b.b.Bytes()
}

func (b *buffer) Len() int {
b.m.Lock()
defer b.m.Unlock()
return b.b.Len()
}
40 changes: 40 additions & 0 deletions pkg/merkletree/publisher/impl/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package impl

import (
"context"
"fmt"

"github.com/rs/zerolog"
)

// MerkleRootRegistryLogger is an implementat that simply logs the roots.
type MerkleRootRegistryLogger struct {
logger zerolog.Logger
}

// NewMerkleRootRegistryLogger creates a new MerkleRootRegistryLogger.
func NewMerkleRootRegistryLogger(logger zerolog.Logger) *MerkleRootRegistryLogger {
return &MerkleRootRegistryLogger{logger: logger}
}

// Publish logs the roots.
func (r *MerkleRootRegistryLogger) Publish(
_ context.Context,
chainID int64,
blockNumber int64,
tables []int64,
roots [][]byte,
) error {
l := r.logger.Info().
Int64("chain_id", chainID).
Int64("block_number", blockNumber).
Ints64("tables", tables)

for i, root := range roots {
l.Hex(fmt.Sprintf("root_%d", tables[i]), root)
}

l.Msg("merkle roots")

return nil
}
Loading

0 comments on commit 347f5bb

Please sign in to comment.