Skip to content

Commit

Permalink
refactoring!(share/ipld): move and rename GetLeavesByNamespace
Browse files Browse the repository at this point in the history
  • Loading branch information
vgonkivs committed Mar 23, 2023
1 parent dfbd558 commit 779740a
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 155 deletions.
6 changes: 3 additions & 3 deletions share/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,20 @@ func GetSharesByNamespace(
defer span.End()

data := ipld.NewNamespaceData(maxShares, nID, ipld.WithLeaves(), ipld.WithProofs())
err := ipld.GetLeavesByNamespace(ctx, bGetter, root, data)
err := data.CollectLeavesByNamespace(ctx, bGetter, root)
if err != nil {
return nil, nil, err
}

leaves := data.CollectLeaves()
leaves := data.Leaves()

shares := make([]Share, len(leaves))
for i, leaf := range leaves {
if leaf != nil {
shares[i] = leafToShare(leaf)
}
}
return shares, data.CollectProof(), err
return shares, data.Proof(), err
}

// leafToShare converts an NMT leaf into a Share.
Expand Down
22 changes: 11 additions & 11 deletions share/get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func TestGetSharesByNamespace(t *testing.T) {
}
}

func TestGetLeavesByNamespace_IncompleteData(t *testing.T) {
func TestCollectLeavesByNamespace_IncompleteData(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
bServ := mdutils.Bserv()
Expand Down Expand Up @@ -221,15 +221,15 @@ func TestGetLeavesByNamespace_IncompleteData(t *testing.T) {
err = bServ.DeleteBlock(ctx, r.Cid())
require.NoError(t, err)

rData := ipld.NewNamespaceData(len(shares), nid, ipld.WithLeaves())
err = ipld.GetLeavesByNamespace(ctx, bServ, rcid, rData)
leaves := rData.CollectLeaves()
namespaceData := ipld.NewNamespaceData(len(shares), nid, ipld.WithLeaves())
err = namespaceData.CollectLeavesByNamespace(ctx, bServ, rcid)
leaves := namespaceData.Leaves()
assert.Nil(t, leaves[1])
assert.Equal(t, 4, len(leaves))
require.Error(t, err)
}

func TestGetLeavesByNamespace_AbsentNamespaceId(t *testing.T) {
func TestCollectLeavesByNamespace_AbsentNamespaceId(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
bServ := mdutils.Bserv()
Expand Down Expand Up @@ -280,7 +280,7 @@ func TestGetLeavesByNamespace_AbsentNamespaceId(t *testing.T) {
}
}

func TestGetLeavesByNamespace_MultipleRowsContainingSameNamespaceId(t *testing.T) {
func TestCollectLeavesByNamespace_MultipleRowsContainingSameNamespaceId(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
bServ := mdutils.Bserv()
Expand All @@ -305,11 +305,11 @@ func TestGetLeavesByNamespace_MultipleRowsContainingSameNamespaceId(t *testing.T
for _, row := range eds.RowRoots() {
rcid := ipld.MustCidFromNamespacedSha256(row)
data := ipld.NewNamespaceData(len(shares), nid, ipld.WithLeaves())
err := ipld.GetLeavesByNamespace(ctx, bServ, rcid, data)
err := data.CollectLeavesByNamespace(ctx, bServ, rcid)
assert.Nil(t, err)
leaves := data.CollectLeaves()
leaves := data.Leaves()
for _, node := range leaves {
// test that the data returned by getLeavesByNamespace for nid
// test that the data returned by collectLeavesByNamespace for nid
// matches the commonNamespaceData that was copied across almost all data
assert.Equal(t, commonNamespaceData, node.RawData()[NamespaceSize:])
}
Expand Down Expand Up @@ -447,8 +447,8 @@ func assertNoRowContainsNID(
// for each row root cid check if the minNID exists
for _, rowCID := range rowRootCIDs {
data := ipld.NewNamespaceData(rowRootCount, nID, ipld.WithProofs())
err := ipld.GetLeavesByNamespace(context.Background(), bServ, rowCID, data)
leaves := data.CollectLeaves()
err := data.CollectLeavesByNamespace(context.Background(), bServ, rowCID)
leaves := data.Leaves()
assert.Nil(t, leaves)
assert.Nil(t, err)
}
Expand Down
137 changes: 1 addition & 136 deletions share/ipld/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
ipld "github.com/ipfs/go-ipld-format"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"

"github.com/celestiaorg/nmt"
)

// NumWorkersLimit sets global limit for workers spawned by GetShares.
Expand Down Expand Up @@ -168,139 +166,6 @@ func GetLeaves(ctx context.Context,
wg.Wait()
}

// GetLeavesByNamespace collects leaves and corresponding proof that could be used to verify leaves
// inclusion. It returns as many leaves from the given root with the given namespace.ID as it can
// retrieve. If no shares are found, it returns error as nil. A
// non-nil error means that only partial data is returned, because at least one share retrieval
// failed. The following implementation is based on `GetShares`.
func GetLeavesByNamespace(
ctx context.Context,
bGetter blockservice.BlockGetter,
root cid.Cid,
data *NamespaceData,
) error {
if err := data.validate(); err != nil {
return err
}

ctx, span := tracer.Start(ctx, "get-leaves-by-namespace")
defer span.End()

span.SetAttributes(
attribute.String("namespace", data.nID.String()),
attribute.String("root", root.String()),
)

// buffer the jobs to avoid blocking, we only need as many
// queued as the number of shares in the second-to-last layer
jobs := make(chan *job, (data.maxShares+1)/2)
jobs <- &job{id: root, ctx: ctx}

var wg chanGroup
wg.jobs = jobs
wg.add(1)

var (
singleErr sync.Once
retrievalErr error
)

for {
var j *job
var ok bool
select {
case j, ok = <-jobs:
case <-ctx.Done():
return ctx.Err()
}

if !ok {
// if there were no leaves under the given root in the given namespace,
// leaves and error will be nil. otherwise, the error will also be non-nil.
if data.noLeaves() {
return retrievalErr
}

return retrievalErr
}
pool.Submit(func() {
ctx, span := tracer.Start(j.ctx, "process-job")
defer span.End()
defer wg.done()

span.SetAttributes(
attribute.String("cid", j.id.String()),
attribute.Int("pos", j.sharePos),
)

// if an error is likely to be returned or not depends on
// the underlying impl of the blockservice, currently it is not a realistic probability
nd, err := GetNode(ctx, bGetter, j.id)
if err != nil {
singleErr.Do(func() {
retrievalErr = err
})
log.Errorw("getLeavesWithProofsByNamespace:could not retrieve node",
"nID", data.nID,
"pos", j.sharePos,
"err", err,
)
span.SetStatus(codes.Error, err.Error())
// we still need to update the bounds
data.addLeaf(j.sharePos, nil)
return
}

links := nd.Links()
if len(links) == 0 {
// successfully fetched a leaf belonging to the namespace
span.SetStatus(codes.Ok, "")
// we found a leaf, so we update the bounds
data.addLeaf(j.sharePos, nd)
return
}

// this node has links in the namespace, so keep walking
for i, lnk := range links {
newJob := &job{
id: lnk.Cid,
// sharePos represents potential share position in share slice
sharePos: j.sharePos*2 + i,
// depth represents the number of edges present in path from the root node of a tree to that node
depth: j.depth + 1,
// we pass the context to job so that spans are tracked in a tree
// structure
ctx: ctx,
}
// if the link's nID isn't in range we don't need to create a new job for it,
// but need to collect a proof
jobNid := NamespacedSha256FromCID(newJob.id)

// proof is on the right side, if the nID is less than min namespace of jobNid
if data.nID.Less(nmt.MinNamespace(jobNid, data.nID.Size())) {
data.addProof(right, lnk.Cid, newJob.depth)
continue
}

// proof is on the left side, if the nID is bigger than max namespace of jobNid
if !data.nID.LessOrEqual(nmt.MaxNamespace(jobNid, data.nID.Size())) {
data.addProof(left, lnk.Cid, newJob.depth)
continue
}

// by passing the previous check, we know we will have one more node to process
// note: it is important to increase the counter before sending to the channel
wg.add(1)
select {
case jobs <- newJob:
case <-ctx.Done():
return
}
}
})
}
}

// GetProof fetches and returns the leaf's Merkle Proof.
// It walks down the IPLD NMT tree until it reaches the leaf and returns collected proof
func GetProof(
Expand Down Expand Up @@ -362,7 +227,7 @@ func (w *chanGroup) done() {
}

// job represents an encountered node to investigate during the `GetLeaves`
// and `GetLeavesByNamespace` routines.
// and `CollectLeavesByNamespace` routines.
type job struct {
id cid.Cid
sharePos int
Expand Down
Loading

0 comments on commit 779740a

Please sign in to comment.