Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V2.2.2 eigenda #6

Open
wants to merge 7 commits into
base: eigenda
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions .github/workflows/docker-eigenda.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
name: Build nitro-eigenda Docker Image
on:
push:
tags: ["*"]

jobs:
docker:
runs-on: ubuntu-latest
strategy:
matrix:
include:
- name: build and push nitro-eigenda
image: nitro-eigenda
dockerfile: Dockerfile
context: .
buildargs: ''
steps:
- name: Checkout
uses: actions/checkout@v3
with:
submodules: "true"

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2

- name: Prepare Environment Variables
run: |
echo "SHORT_SHA=${GITHUB_SHA::7}" | tee -a $GITHUB_ENV
GIT_TAG=$(git tag --points-at HEAD)
echo "GIT_TAG=$GIT_TAG" | tee -a $GITHUB_ENV
echo "REF_NAME=$(echo ${GIT_TAG:-$GITHUB_REF_NAME} | sed 's/[^a-zA-Z0-9._]/-/g')" | tee -a $GITHUB_ENV

- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.ECR_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.ECR_ACCESS_KEY }}
aws-region: us-west-2

- name: Login to Amazon ECR
id: login-ecr
uses: aws-actions/amazon-ecr-login@v1

- name: ${{ matrix.name }}
uses: docker/build-push-action@v4
with:
context: ${{ matrix.context }}
push: true
tags: ${{ secrets.ECR_REGISTRY }}/${{ matrix.image }}:${{ github.sha }} , ${{ secrets.ECR_REGISTRY }}/${{ matrix.image }}:${{ env.SHORT_SHA }} , ${{ secrets.ECR_REGISTRY }}/${{ matrix.image }}:${{ env.REF_NAME }} , ${{ secrets.ECR_REGISTRY }}/${{ matrix.image }}:${{ env.REF_NAME }}-${{ env.SHORT_SHA }} , ${{ secrets.ECR_REGISTRY }}/${{ matrix.image }}:latest
file: ${{ matrix.dockerfile }}
provenance: false
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ COPY ./blsSignatures ./blsSignatures
COPY ./cmd/chaininfo ./cmd/chaininfo
COPY ./cmd/replay ./cmd/replay
COPY ./das/dastree ./das/dastree
COPY ./das/eigenda ./das/eigenda
COPY ./precompiles ./precompiles
COPY ./statetransfer ./statetransfer
COPY ./util ./util
Expand Down
46 changes: 35 additions & 11 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/offchainlabs/nitro/cmd/chaininfo"
"github.com/offchainlabs/nitro/cmd/genericconf"
"github.com/offchainlabs/nitro/das"
"github.com/offchainlabs/nitro/das/eigenda"
"github.com/offchainlabs/nitro/solgen/go/bridgegen"
"github.com/offchainlabs/nitro/util/arbmath"
"github.com/offchainlabs/nitro/util/headerreader"
Expand Down Expand Up @@ -71,6 +72,7 @@ type BatchPoster struct {
gasRefunderAddr common.Address
building *buildingBatch
daWriter das.DataAvailabilityServiceWriter
eigenDAWriter eigenda.EigenDAWriter
dataPoster *dataposter.DataPoster
redisLock *redislock.Simple
firstEphemeralError time.Time // first time a continuous error suspected to be ephemeral occurred
Expand Down Expand Up @@ -100,8 +102,9 @@ const (
)

type BatchPosterConfig struct {
Enable bool `koanf:"enable"`
DisableDasFallbackStoreDataOnChain bool `koanf:"disable-das-fallback-store-data-on-chain" reload:"hot"`
Enable bool `koanf:"enable"`
DisableDasFallbackStoreDataOnChain bool `koanf:"disable-das-fallback-store-data-on-chain" reload:"hot"`
DisableEigenDAFallbackStoreDataOnChain bool `koanf:"disable-eigenda-fallback-store-data-on-chain" reload:"hot"`
// Max batch size.
MaxSize int `koanf:"max-size" reload:"hot"`
// Max batch post delay.
Expand Down Expand Up @@ -219,15 +222,16 @@ var TestBatchPosterConfig = BatchPosterConfig{
}

type BatchPosterOpts struct {
DataPosterDB ethdb.Database
L1Reader *headerreader.HeaderReader
Inbox *InboxTracker
Streamer *TransactionStreamer
SyncMonitor *SyncMonitor
Config BatchPosterConfigFetcher
DeployInfo *chaininfo.RollupAddresses
TransactOpts *bind.TransactOpts
DAWriter das.DataAvailabilityServiceWriter
DataPosterDB ethdb.Database
L1Reader *headerreader.HeaderReader
Inbox *InboxTracker
Streamer *TransactionStreamer
SyncMonitor *SyncMonitor
Config BatchPosterConfigFetcher
DeployInfo *chaininfo.RollupAddresses
TransactOpts *bind.TransactOpts
DAWriter das.DataAvailabilityServiceWriter
EigenDAWriter eigenda.EigenDAWriter
}

func NewBatchPoster(ctx context.Context, opts *BatchPosterOpts) (*BatchPoster, error) {
Expand Down Expand Up @@ -272,6 +276,7 @@ func NewBatchPoster(ctx context.Context, opts *BatchPosterOpts) (*BatchPoster, e
gasRefunderAddr: opts.Config().gasRefunder,
bridgeAddr: opts.DeployInfo.Bridge,
daWriter: opts.DAWriter,
eigenDAWriter: opts.EigenDAWriter,
redisLock: redisLock,
}
b.messagesPerBatch, err = arbmath.NewMovingAverage[uint64](20)
Expand Down Expand Up @@ -1035,6 +1040,25 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
}
}

if b.daWriter == nil && b.eigenDAWriter != nil {
log.Info("Start to write data to eigenda: ", "data", hex.EncodeToString(sequencerMsg))
daRef, err := b.eigenDAWriter.Store(ctx, sequencerMsg)
if err != nil {
if config.DisableEigenDAFallbackStoreDataOnChain {
log.Warn("Falling back to storing data on chain", "err", err)
return false, errors.New("unable to post batch to EigenDA and fallback storing data on chain is disabled")
}
}

pointer, err := b.eigenDAWriter.Serialize(daRef)
if err != nil {
log.Warn("DaRef serialization failed", "err", err)
return false, errors.New("DaRef serialization failed")
}
log.Info("EigenDA transaction receipt(data pointer): ", "hash", hex.EncodeToString(daRef.BatchHeaderHash), "index", daRef.BlobIndex)
sequencerMsg = pointer
}

data, err := b.encodeAddBatch(new(big.Int).SetUint64(batchPosition.NextSeqNum), batchPosition.MessageCount, b.building.msgCount, sequencerMsg, b.building.segments.delayedMsg)
if err != nil {
return false, err
Expand Down
2 changes: 1 addition & 1 deletion arbnode/delayed_seq_reorg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestSequencerReorgFromDelayed(t *testing.T) {
defer cancel()

exec, streamer, db, _ := NewTransactionStreamerForTest(t, common.Address{})
tracker, err := NewInboxTracker(db, streamer, nil)
tracker, err := NewInboxTracker(db, streamer, nil, nil)
Require(t, err)

err = streamer.Start(ctx)
Expand Down
7 changes: 5 additions & 2 deletions arbnode/inbox_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcaster"
m "github.com/offchainlabs/nitro/broadcaster/message"
"github.com/offchainlabs/nitro/das/eigenda"
"github.com/offchainlabs/nitro/staker"
"github.com/offchainlabs/nitro/util/containers"
)
Expand All @@ -38,12 +39,13 @@ type InboxTracker struct {
mutex sync.Mutex
validator *staker.BlockValidator
das arbstate.DataAvailabilityReader
eigenDA eigenda.EigenDAReader

batchMetaMutex sync.Mutex
batchMeta *containers.LruCache[uint64, BatchMetadata]
}

func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, das arbstate.DataAvailabilityReader) (*InboxTracker, error) {
func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, das arbstate.DataAvailabilityReader, eigenDAReader eigenda.EigenDAReader) (*InboxTracker, error) {
// We support a nil txStreamer for the pruning code
if txStreamer != nil && txStreamer.chainConfig.ArbitrumChainParams.DataAvailabilityCommittee && das == nil {
return nil, errors.New("data availability service required but unconfigured")
Expand All @@ -52,6 +54,7 @@ func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, das arb
db: db,
txStreamer: txStreamer,
das: das,
eigenDA: eigenDAReader,
batchMeta: containers.NewLruCache[uint64, BatchMetadata](1000),
}
return tracker, nil
Expand Down Expand Up @@ -603,7 +606,7 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L
ctx: ctx,
client: client,
}
multiplexer := arbstate.NewInboxMultiplexer(backend, prevbatchmeta.DelayedMessageCount, t.das, arbstate.KeysetValidate)
multiplexer := arbstate.NewInboxMultiplexer(backend, prevbatchmeta.DelayedMessageCount, t.das, t.eigenDA, arbstate.KeysetValidate)
batchMessageCounts := make(map[uint64]arbutil.MessageIndex)
currentpos := prevbatchmeta.MessageCount + 1
for {
Expand Down
33 changes: 23 additions & 10 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/offchainlabs/nitro/broadcaster"
"github.com/offchainlabs/nitro/cmd/chaininfo"
"github.com/offchainlabs/nitro/das"
"github.com/offchainlabs/nitro/das/eigenda"
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/execution/gethexec"
"github.com/offchainlabs/nitro/solgen/go/bridgegen"
Expand Down Expand Up @@ -85,6 +86,7 @@ type Config struct {
Staker staker.L1ValidatorConfig `koanf:"staker" reload:"hot"`
SeqCoordinator SeqCoordinatorConfig `koanf:"seq-coordinator"`
DataAvailability das.DataAvailabilityConfig `koanf:"data-availability"`
EigenDA eigenda.EigenDAConfig `koanf:"eigen-da"`
SyncMonitor SyncMonitorConfig `koanf:"sync-monitor"`
Dangerous DangerousConfig `koanf:"dangerous"`
TransactionStreamer TransactionStreamerConfig `koanf:"transaction-streamer" reload:"hot"`
Expand Down Expand Up @@ -484,6 +486,8 @@ func createNodeImpl(
var daWriter das.DataAvailabilityServiceWriter
var daReader das.DataAvailabilityServiceReader
var dasLifecycleManager *das.LifecycleManager
var eigenDAReader eigenda.EigenDAReader
var eigenDAWriter eigenda.EigenDAWriter
if config.DataAvailability.Enable {
if config.BatchPoster.Enable {
daWriter, daReader, dasLifecycleManager, err = das.CreateBatchPosterDAS(ctx, &config.DataAvailability, dataSigner, l1client, deployInfo.SequencerInbox)
Expand All @@ -507,9 +511,16 @@ func createNodeImpl(
}
} else if l2Config.ArbitrumChainParams.DataAvailabilityCommittee {
return nil, errors.New("a data availability service is required for this chain, but it was not configured")
} else if config.EigenDA.Enable {
eigenDAService, err := eigenda.NewEigenDA(config.EigenDA.Rpc)
if err != nil {
return nil, err
}
eigenDAReader = eigenDAService
eigenDAWriter = eigenDAService
}

inboxTracker, err := NewInboxTracker(arbDb, txStreamer, daReader)
inboxTracker, err := NewInboxTracker(arbDb, txStreamer, daReader, eigenDAReader)
if err != nil {
return nil, err
}
Expand All @@ -528,6 +539,7 @@ func createNodeImpl(
exec,
rawdb.NewTable(arbDb, storage.BlockValidatorPrefix),
daReader,
eigenDAReader,
func() *staker.BlockValidatorConfig { return &configFetcher.Get().BlockValidator },
stack,
)
Expand Down Expand Up @@ -634,15 +646,16 @@ func createNodeImpl(
return nil, errors.New("batchposter, but no TxOpts")
}
batchPoster, err = NewBatchPoster(ctx, &BatchPosterOpts{
DataPosterDB: rawdb.NewTable(arbDb, storage.BatchPosterPrefix),
L1Reader: l1Reader,
Inbox: inboxTracker,
Streamer: txStreamer,
SyncMonitor: syncMonitor,
Config: func() *BatchPosterConfig { return &configFetcher.Get().BatchPoster },
DeployInfo: deployInfo,
TransactOpts: txOptsBatchPoster,
DAWriter: daWriter,
DataPosterDB: rawdb.NewTable(arbDb, storage.BatchPosterPrefix),
L1Reader: l1Reader,
Inbox: inboxTracker,
Streamer: txStreamer,
SyncMonitor: syncMonitor,
Config: func() *BatchPosterConfig { return &configFetcher.Get().BatchPoster },
DeployInfo: deployInfo,
TransactOpts: txOptsBatchPoster,
DAWriter: daWriter,
EigenDAWriter: eigenDAWriter,
})
if err != nil {
return nil, err
Expand Down
27 changes: 24 additions & 3 deletions arbstate/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"bytes"
"context"
"encoding/binary"
"encoding/hex"
"errors"
"io"
"math/big"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/offchainlabs/nitro/arbos/l1pricing"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/das/dastree"
"github.com/offchainlabs/nitro/das/eigenda"
"github.com/offchainlabs/nitro/zeroheavy"
)

Expand Down Expand Up @@ -50,7 +52,7 @@ const maxZeroheavyDecompressedLen = 101*MaxDecompressedLen/100 + 64
const MaxSegmentsPerSequencerMessage = 100 * 1024
const MinLifetimeSecondsForDataAvailabilityCert = 7 * 24 * 60 * 60 // one week

func parseSequencerMessage(ctx context.Context, batchNum uint64, data []byte, dasReader DataAvailabilityReader, keysetValidationMode KeysetValidationMode) (*sequencerMessage, error) {
func parseSequencerMessage(ctx context.Context, batchNum uint64, data []byte, dasReader DataAvailabilityReader, eigenDAReader eigenda.EigenDAReader, keysetValidationMode KeysetValidationMode) (*sequencerMessage, error) {
if len(data) < 40 {
return nil, errors.New("sequencer message missing L1 header")
}
Expand All @@ -63,6 +65,7 @@ func parseSequencerMessage(ctx context.Context, batchNum uint64, data []byte, da
segments: [][]byte{},
}
payload := data[40:]
log.Info("Inbox parse sequencer message: ", "payload", hex.EncodeToString(payload))

if len(payload) > 0 && IsDASMessageHeaderByte(payload[0]) {
if dasReader == nil {
Expand All @@ -79,6 +82,22 @@ func parseSequencerMessage(ctx context.Context, batchNum uint64, data []byte, da
}
}

// detect eigenda message from byte
if len(payload) > 0 && eigenda.IsEigenDAMessageHeaderByte(payload[0]) {
if eigenDAReader == nil {
log.Error("No EigenDA Reader configured, but sequencer message found with EigenDA header")
} else {
var err error
payload, err = eigenda.RecoverPayloadFromEigenDABatch(ctx, payload[1:], eigenDAReader, nil)
if err != nil {
return nil, err
}
if payload == nil {
return parsedMsg, nil
}
}
}

if len(payload) > 0 && IsZeroheavyEncodedHeaderByte(payload[0]) {
pl, err := io.ReadAll(io.LimitReader(zeroheavy.NewZeroheavyDecoder(bytes.NewReader(payload[1:])), int64(maxZeroheavyDecompressedLen)))
if err != nil {
Expand Down Expand Up @@ -242,6 +261,7 @@ type inboxMultiplexer struct {
backend InboxBackend
delayedMessagesRead uint64
dasReader DataAvailabilityReader
eigenDAReader eigenda.EigenDAReader
cachedSequencerMessage *sequencerMessage
cachedSequencerMessageNum uint64
cachedSegmentNum uint64
Expand All @@ -251,11 +271,12 @@ type inboxMultiplexer struct {
keysetValidationMode KeysetValidationMode
}

func NewInboxMultiplexer(backend InboxBackend, delayedMessagesRead uint64, dasReader DataAvailabilityReader, keysetValidationMode KeysetValidationMode) arbostypes.InboxMultiplexer {
func NewInboxMultiplexer(backend InboxBackend, delayedMessagesRead uint64, dasReader DataAvailabilityReader, eigenDAReader eigenda.EigenDAReader, keysetValidationMode KeysetValidationMode) arbostypes.InboxMultiplexer {
return &inboxMultiplexer{
backend: backend,
delayedMessagesRead: delayedMessagesRead,
dasReader: dasReader,
eigenDAReader: eigenDAReader,
keysetValidationMode: keysetValidationMode,
}
}
Expand All @@ -276,7 +297,7 @@ func (r *inboxMultiplexer) Pop(ctx context.Context) (*arbostypes.MessageWithMeta
}
r.cachedSequencerMessageNum = r.backend.GetSequencerInboxPosition()
var err error
r.cachedSequencerMessage, err = parseSequencerMessage(ctx, r.cachedSequencerMessageNum, bytes, r.dasReader, r.keysetValidationMode)
r.cachedSequencerMessage, err = parseSequencerMessage(ctx, r.cachedSequencerMessageNum, bytes, r.dasReader, r.eigenDAReader, r.keysetValidationMode)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion arbstate/inbox_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func FuzzInboxMultiplexer(f *testing.F) {
delayedMessage: delayedMsg,
positionWithinMessage: 0,
}
multiplexer := NewInboxMultiplexer(backend, 0, nil, KeysetValidate)
multiplexer := NewInboxMultiplexer(backend, 0, nil, nil, KeysetValidate)
_, err := multiplexer.Pop(context.TODO())
if err != nil {
panic(err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/nitro/nitro.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func mainImpl() int {
return 1
}

log.Info("Running Arbitrum nitro node", "revision", vcsRevision, "vcs.time", vcsTime)
log.Info("Running Arbitrum nitro node with eigenda integration", "revision", vcsRevision, "vcs.time", vcsTime)

if nodeConfig.Node.Dangerous.NoL1Listener {
nodeConfig.Node.ParentChainReader.Enable = false
Expand Down
2 changes: 1 addition & 1 deletion cmd/pruning/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func findImportantRoots(ctx context.Context, chainDb ethdb.Database, stack *node
return nil, fmt.Errorf("failed to get finalized block: %w", err)
}
l1BlockNum := l1Block.NumberU64()
tracker, err := arbnode.NewInboxTracker(arbDb, nil, nil)
tracker, err := arbnode.NewInboxTracker(arbDb, nil, nil, nil)
if err != nil {
return nil, err
}
Expand Down
Loading
Loading