Skip to content

Commit

Permalink
chore(rpc): optimize genesis file chunking (cometbft#4349)
Browse files Browse the repository at this point in the history
Closes cometbft#1289.

### Changes
#### `Environment` type
The `Environment` type will:
- store the genesis file's path on disk
- store a `map[int]string` where the keys are the chunks' IDs, and the
values are the chunk's path on disk.
- not store a pointer to a `types.GenesisDoc` anymore
- not store a slice of genesis file chunks anymore. Instead the method
`InitGenesisChunks` will create chunks and store them on disk, 1 file
per chunk.

#### `Node` type
The `Node` type will:
- not store a pointer to a `GenesisDoc` anymore. 
- pass the genesis file's path on disk to a new `Environment` object
when creating it in the `ConfigureRPC()` method.

#### RPC `/genesis_chunked`
This endpoint will implement the chunking strategy described
[here](cometbft#1290 (comment)).

---

#### PR checklist

- [x] Tests written/updated
- [x] Changelog entry added in `.changelog` (we use
[unclog](https://github.com/informalsystems/unclog) to manage our
changelog)
- [x] Updated relevant documentation (`docs/` or `spec/`) and code
comments

---------

Co-authored-by: Andy Nogueira <[email protected]>
Co-authored-by: Anton Kaliaev <[email protected]>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
Co-authored-by: Daniel <[email protected]>
  • Loading branch information
5 people authored Oct 29, 2024
1 parent b1dcc58 commit 48db7ac
Show file tree
Hide file tree
Showing 9 changed files with 775 additions and 206 deletions.
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
- `[node]` Don't store a pointer to a `types.GenesisDoc` after the node is running
startup ([\#4250](https://github.com/cometbft/cometbft/pull/4250))
- `[node]` Don't store a pointer to a `types.GenesisDoc` after the node is running ([\#4250](https://github.com/cometbft/cometbft/pull/4250))
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
- `[rpc]` Store either a pointer to a GenesisDoc or the genesis' chunks, but not both in Environment (RPC API)
a GenesisDoc or the genesis' chunks, but not both.
([\#4235](https://github.com/cometbft/cometbft/pull/4235))
- `[rpc]` Store either a pointer to a `GenesisDoc` or the genesis' chunks, but not both in Environment (RPC API) ([\#4235](https://github.com/cometbft/cometbft/pull/4235))
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- Node no longer stores genesis in memory after startup
([\#4349](https://github.com/cometbft/cometbft/pull/4349))
33 changes: 14 additions & 19 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,7 @@ type Node struct {
service.BaseService

// config
config *cfg.Config

// genesisDoc stores the initial validator set.
// NOTE: this pointer will be set to nil once startup is done. In future work
// we plan to remove this field altogether so that the genesis isn't stored in
// memory at runtime.
genesisDoc *types.GenesisDoc
config *cfg.Config
genesisTime time.Time
privValidator types.PrivValidator // local node's validator key

Expand Down Expand Up @@ -554,7 +548,6 @@ func NewNodeWithCliParams(ctx context.Context,

node := &Node{
config: config,
genesisDoc: genDoc,
genesisTime: genDoc.GenesisTime,
privValidator: privValidator,

Expand Down Expand Up @@ -616,7 +609,7 @@ func (n *Node) OnStart() error {
if n.config.RPC.ListenAddress != "" {
listeners, err := n.startRPC()
if err != nil {
return err
return fmt.Errorf("starting RPC server: %w", err)
}
n.rpcListeners = listeners
}
Expand Down Expand Up @@ -662,8 +655,6 @@ func (n *Node) OnStart() error {
return ErrStartPruning{Err: err}
}

n.genesisDoc = nil

return nil
}

Expand Down Expand Up @@ -785,7 +776,6 @@ func (n *Node) ConfigureRPC() (*rpccore.Environment, error) {
P2PTransport: n,
PubKey: pubKey,

GenDoc: n.genesisDoc,
TxIndexer: n.txIndexer,
BlockIndexer: n.blockIndexer,
ConsensusReactor: n.consensusReactor,
Expand All @@ -796,12 +786,13 @@ func (n *Node) ConfigureRPC() (*rpccore.Environment, error) {
Logger: n.Logger.With("module", "rpc"),

Config: *n.config.RPC,

GenesisFilePath: n.config.GenesisFile(),
}

n.Logger.Info("Creating genesis file chunks if genesis file is too big...")

if err := _rpcEnv.InitGenesisChunks(); err != nil {
errToReturn = fmt.Errorf("setting up RPC API environment: %s", err)
errToReturn = fmt.Errorf("configuring RPC API environment: %w", err)
return
}
})
Expand Down Expand Up @@ -873,25 +864,29 @@ func (n *Node) startRPC() ([]net.Listener, error) {
}
if n.config.RPC.IsTLSEnabled() {
go func() {
if err := rpcserver.ServeTLS(
err := rpcserver.ServeTLSWithShutdown(
listener,
rootHandler,
n.config.RPC.CertFile(),
n.config.RPC.KeyFile(),
rpcLogger,
config,
); err != nil {
n.Logger.Error("Error serving server with TLS", "err", err)
env.Cleanup,
)
if err != nil {
n.Logger.Error("serving server with TLS", "err", err)
}
}()
} else {
go func() {
if err := rpcserver.Serve(
err := rpcserver.ServeWithShutdown(
listener,
rootHandler,
rpcLogger,
config,
); err != nil {
env.Cleanup,
)
if err != nil {
n.Logger.Error("Error serving server", "err", err)
}
}()
Expand Down
212 changes: 167 additions & 45 deletions rpc/core/env.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
package core

import (
"encoding/base64"
"errors"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"strconv"
"time"

abcicli "github.com/cometbft/cometbft/abci/client"
cfg "github.com/cometbft/cometbft/config"
"github.com/cometbft/cometbft/crypto"
cmtjson "github.com/cometbft/cometbft/libs/json"
"github.com/cometbft/cometbft/libs/log"
mempl "github.com/cometbft/cometbft/mempool"
"github.com/cometbft/cometbft/p2p"
Expand All @@ -33,6 +36,8 @@ const (
// genesisChunkSize is the maximum size, in bytes, of each
// chunk in the genesis structure for the chunked API.
genesisChunkSize = 2 * 1024 * 1024 // 2 MB

_chunksDir = "genesis-chunks"
)

// These interfaces are used by RPC and must be thread safe
Expand Down Expand Up @@ -95,7 +100,6 @@ type Environment struct {

// objects
PubKey crypto.PubKey
GenDoc *types.GenesisDoc // cache the genesis structure
TxIndexer txindex.TxIndexer
BlockIndexer indexer.BlockIndexer
EventBus *types.EventBus // thread safe
Expand All @@ -105,70 +109,82 @@ type Environment struct {

Config cfg.RPCConfig

// cache of chunked genesis data.
genChunks []string
GenesisFilePath string // the genesis file's full path on disk

// genesisChunk is a map of chunk ID to its full path on disk.
// If the genesis file is smaller than genesisChunkSize, then this map will be
// nil, because there will be no chunks on disk.
// This map is convenient for the `/genesis_chunked` API to quickly find a chunk
// by its ID, instead of having to reconstruct its path each time, which would
// involve multiple string operations.
genesisChunksFiles map[int]string
}

// InitGenesisChunks checks whether it makes sense to create a cache of chunked
// genesis data. It is called on Node startup and should be called only once.
// InitGenesisChunks checks whether it makes sense to split the genesis file into
// small chunks to be stored on disk.
// It is called on Node startup and should be called only once.
// Rules of chunking:
// - if the genesis file's size is <= genesisChunkSize, then no chunking.
// An `Environment` object will store a pointer to the genesis in its GenDoc
// field. Its genChunks field will be set to nil. `/genesis` RPC API will return
// the GenesisDoc itself.
// - if the genesis file's size is > genesisChunkSize, then use chunking. An
// `Environment` object will store a slice of base64-encoded chunks in its
// genChunks field. Its GenDoc field will be set to nil. `/genesis` RPC API will
// redirect users to use the `/genesis_chunked` API.
// - if the genesis file's size is <= genesisChunkSize, this function returns
// without doing anything. The `/genesis` RPC API endpoint will fetch the genesis
// file from disk to serve requests.
// - if the genesis file's size is > genesisChunkSize, then use chunking. The
// function splits the genesis file into chunks of genesisChunkSize and stores
// each chunk on disk. The `/genesis_chunked` RPC API endpoint will fetch the
// genesis file chunks from disk to serve requests.
func (env *Environment) InitGenesisChunks() error {
if len(env.genChunks) > 0 {
if len(env.genesisChunksFiles) > 0 {
// we already computed the chunks, return.
return nil
}

if env.GenDoc == nil {
gFilePath := env.GenesisFilePath
if len(gFilePath) == 0 {
// chunks not computed yet, but no genesis available.
// This should not happen.
return errors.New("could not create the genesis file chunks and cache them because the genesis doc is unavailable")
return errors.New("missing genesis file path on disk")
}

data, err := cmtjson.Marshal(env.GenDoc)
gFileSize, err := fileSize(gFilePath)
if err != nil {
return fmt.Errorf("encoding genesis doc to JSON: %w", err)
return fmt.Errorf("gauging genesis file size: %w", err)
}

// If genesis is less than 16MB, then no chunking.
// Keep a pointer to a GenesisDoc in env.GenDoc.
if len(data) <= genesisChunkSize {
env.genChunks = nil
if gFileSize <= genesisChunkSize {
// no chunking required
return nil
}

var (
nChunks = (len(data) + genesisChunkSize - 1) / genesisChunkSize
chunks = make([]string, nChunks)
)
for i := range nChunks {
var (
start = i * genesisChunkSize
end = start + genesisChunkSize
)
if end > len(data) {
end = len(data)
}
gChunksDir, err := mkChunksDir(gFilePath, _chunksDir)
if err != nil {
return fmt.Errorf("preparing chunks directory: %w", err)
}

// we make a copy here so that the original data isn't retained in memory.
// The GC will collect the data slice after exiting the function.
// Without the copy, it would keep it in memory.
chunk := make([]byte, end-start)
copy(chunk, data[start:end])
chunks[i] = base64.StdEncoding.EncodeToString(chunk)
// chunking required
chunkIDToPath, err := writeChunks(gFilePath, gChunksDir, genesisChunkSize)
if err != nil {
return fmt.Errorf("chunking large genesis file: %w", err)
}

env.genChunks = chunks
env.genesisChunksFiles = chunkIDToPath

return nil
}

// we store the chunks; don't store a ptr to the genesis anymore.
env.GenDoc = nil
// Cleanup deletes the directory storing the genesis file chunks on disk
// if it exists. If the directory does not exist, the function is a no-op.
// The chunks' directory is a sub-directory of the `config/` directory of the
// running node (i.e., where the genesis.json file is stored).
// We call the function:
// - before creating new genesis file chunks, to make sure we start with a clean
// directory.
// - when a Node shuts down, to clean up the file system.
func (env *Environment) Cleanup() error {
gFileDir := filepath.Dir(env.GenesisFilePath)
chunksDir := filepath.Join(gFileDir, _chunksDir)

if err := os.RemoveAll(chunksDir); err != nil {
return fmt.Errorf("deleting genesis file chunks' folder: %w", err)
}

return nil
}
Expand Down Expand Up @@ -245,3 +261,109 @@ func (env *Environment) latestUncommittedHeight() int64 {
}
return env.BlockStore.Height() + 1
}

// fileSize returns the size of the file at the given path.
func fileSize(fPath string) (int, error) {
// we use os.Stat here instead of os.ReadFile, because we don't want to load
// the entire file into memory just to compute its size from the resulting
// []byte slice.
fInfo, err := os.Stat(fPath)
if errors.Is(err, fs.ErrNotExist) {
return 0, fmt.Errorf("the file is unavailable at %s", fPath)
} else if err != nil {
return 0, fmt.Errorf("accessing file: %w", err)
}
return int(fInfo.Size()), nil
}

// mkChunksDir creates a new directory to store the genesis file's chunks.
// gFilePath is the genesis file's full path on disk.
// dirName is the name of the directory to be created, not it's path on disk.
// mkChunksDir will create a directory named 'dirName' as a sub-directory of the
// genesis file's directory (gFileDir).
// It returns the new directory's full path or an empty string if there is an
// error.
func mkChunksDir(gFilePath string, dirName string) (string, error) {
var (
gFileDir = filepath.Dir(gFilePath)
dirPath = filepath.Join(gFileDir, dirName)
)
if _, err := os.Stat(dirPath); err == nil {
// directory already exists; this might happen it the node crashed and
// could not do cleanup. Delete it to start from scratch.
if err := os.RemoveAll(dirPath); err != nil {
return "", fmt.Errorf("deleting existing chunks directory: %w", err)
}
} else if !os.IsNotExist(err) {
return "", fmt.Errorf("accessing directory: %w", err)
}

if err := os.Mkdir(dirPath, 0o700); err != nil {
return "", fmt.Errorf("creating chunks directory: %s", err)
}

return dirPath, nil
}

// writeChunk writes a chunk of the genesis file to disk, saving it to dir.
// Each chunk file name's format will be: chunk_[chunkID].part, e.g., chunk_42.part.
func writeChunk(chunk []byte, dir string, chunkID int) (string, error) {
var (
chunkName = "chunk_" + strconv.Itoa(chunkID) + ".part"
chunkPath = filepath.Join(dir, chunkName)
)
if err := os.WriteFile(chunkPath, chunk, 0o600); err != nil {
return "", fmt.Errorf("writing chunk to disk: %w", err)
}

return chunkPath, nil
}

// writeChunks reads the genesis file in chunks of size chunkSize, and writes them
// to disk.
// gFilePath is the genesis file's full path on disk.
// gChunksDir is the directory where the chunks will be stored on disk.
// chunkSize is the size of a chunk, that is, writeChunks will read the genesis file
// in chunks of size chunkSize.
// It returns a map where the keys are the chunk IDs, and the values are the chunks'
// path on disk. E.g.,:
// map[0] = $HOME/.cometbft/config/genesis-chunks/chunk_0.part
// map[1] = $HOME/.cometbft/config/genesis-chunks/chunk_1.part
// and so on for all chunks.
// The map will be useful for the `/genesis_chunked` RPC endpoint to quickly find
// a chunk on disk given its ID.
func writeChunks(
gFilePath, gChunksDir string,
chunkSize int,
) (map[int]string, error) {
gFile, err := os.Open(gFilePath)
if err != nil {
return nil, fmt.Errorf("opening genesis file: %s", err)
}
defer gFile.Close()

var (
buf = make([]byte, chunkSize)
chunkIDToPath = make(map[int]string)
)
for chunkID := 0; ; chunkID++ {
n, err := gFile.Read(buf)
if err != nil {
if errors.Is(err, io.EOF) {
break
}

formatStr := "chunk %d: reading genesis file: %w"
return nil, fmt.Errorf(formatStr, chunkID, err)
}

chunkPath, err := writeChunk(buf[:n], gChunksDir, chunkID)
if err != nil {
return nil, fmt.Errorf("chunk %d: %w", chunkID, err)
}

chunkIDToPath[chunkID] = chunkPath
}

return chunkIDToPath, nil
}
Loading

0 comments on commit 48db7ac

Please sign in to comment.