Skip to content

Commit

Permalink
add some logic to avoid close twice
Browse files Browse the repository at this point in the history
  • Loading branch information
mattisonchao committed Oct 17, 2024
1 parent 0d05dd3 commit c0d9b13
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 43 deletions.
19 changes: 11 additions & 8 deletions server/follower_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ func (fc *followerController) close() error {

if fc.wal != nil {
err = multierr.Append(err, fc.wal.Close())
fc.wal = nil
}

if fc.db != nil {
Expand Down Expand Up @@ -758,17 +759,19 @@ func (fc *followerController) DeleteShard(request *proto.DeleteShardRequest) (*p

fc.log.Info("Deleting shard")

// Wipe out both WAL and DB contents
if err := multierr.Combine(
fc.wal.Delete(),
fc.db.Delete(),
); err != nil {
deleteWal := fc.wal
deleteDb := fc.db

// close the fc first
if err := fc.close(); err != nil {
return nil, err
}

fc.db = nil
fc.wal = nil
if err := fc.close(); err != nil {
// Wipe out both WAL and DB contents
if err := multierr.Combine(
deleteWal.Delete(),
deleteDb.Delete(),
); err != nil {
return nil, err
}

Expand Down
24 changes: 18 additions & 6 deletions server/kv/kv_pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package kv

import (
"fmt"
"golang.org/x/net/context"
"io"
"log/slog"
"os"
Expand Down Expand Up @@ -138,6 +139,8 @@ func (p *PebbleFactory) getKVPath(namespace string, shard int64) string {
}

type Pebble struct {
ctx context.Context
cancel context.CancelFunc
factory *PebbleFactory
namespace string
shardId int64
Expand All @@ -162,8 +165,11 @@ type Pebble struct {
}

func newKVPebble(factory *PebbleFactory, namespace string, shardId int64) (KV, error) {
ctx, cancelFunc := context.WithCancel(context.Background())
labels := metrics.LabelsForShard(namespace, shardId)
pb := &Pebble{
ctx: ctx,
cancel: cancelFunc,
factory: factory,
namespace: namespace,
shardId: shardId,
Expand Down Expand Up @@ -342,14 +348,20 @@ func newKVPebble(factory *PebbleFactory, namespace string, shardId int64) (KV, e
}

func (p *Pebble) Close() error {
for _, g := range p.gauges {
g.Unregister()
}
select {
case <-p.ctx.Done():
return nil
default:
p.cancel()
for _, g := range p.gauges {
g.Unregister()
}

if err := p.db.Flush(); err != nil {
return err
if err := p.db.Flush(); err != nil {
return err
}
return p.db.Close()
}
return p.db.Close()
}

func (p *Pebble) Delete() error {
Expand Down
13 changes: 9 additions & 4 deletions server/kv/notifications_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,13 @@ func (nt *notificationsTracker) ReadNextNotifications(ctx context.Context, start
}

func (nt *notificationsTracker) Close() error {
nt.cancel()
nt.closed.Store(true)
nt.cond.Broadcast()
return nt.waitClose.Wait(context.Background())
select {
case <-nt.ctx.Done():
return nil
default:
nt.cancel()
nt.closed.Store(true)
nt.cond.Broadcast()
return nt.waitClose.Wait(context.Background())
}
}
15 changes: 9 additions & 6 deletions server/leader_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1099,16 +1099,19 @@ func (lc *leaderController) DeleteShard(request *proto.DeleteShardRequest) (*pro
}

lc.log.Info("Deleting shard")
deleteWal := lc.wal
deleteDb := lc.db

// Wipe out both WAL and DB contents
if err := multierr.Combine(
lc.wal.Delete(),
lc.db.Delete(),
); err != nil {
// close the leader controller first
if err := lc.close(); err != nil {
return nil, err
}

if err := lc.close(); err != nil {
// Wipe out both WAL and DB contents
if err := multierr.Combine(
deleteWal.Delete(),
deleteDb.Delete(),
); err != nil {
return nil, err
}

Expand Down
31 changes: 16 additions & 15 deletions server/wal/wal_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,25 +224,26 @@ func (t *wal) trim(firstOffset int64) error {
}

func (t *wal) Close() error {
if err := t.trimmer.Close(); err != nil {
return err
}

t.Lock()
defer t.Unlock()

return t.close()
return t.closeWithoutLock()
}

func (t *wal) close() error {
t.cancel()
t.activeEntries.Unregister()

return multierr.Combine(
t.trimmer.Close(),
t.currentSegment.Close(),
t.readOnlySegments.Close(),
)
func (t *wal) closeWithoutLock() error {
select {
case <-t.ctx.Done():
return nil
default:
t.cancel()
t.activeEntries.Unregister()

return multierr.Combine(
t.trimmer.Close(),
t.currentSegment.Close(),
t.readOnlySegments.Close(),
)
}
}

func (t *wal) Append(entry *proto.LogEntry) error {
Expand Down Expand Up @@ -454,7 +455,7 @@ func (t *wal) Delete() error {
defer t.Unlock()

return multierr.Combine(
t.close(),
t.closeWithoutLock(),
os.RemoveAll(t.walPath),
)
}
Expand Down
13 changes: 9 additions & 4 deletions server/wal/wal_trimmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,16 @@ type trimmer struct {
}

func (t *trimmer) Close() error {
t.cancel()
t.ticker.Stop()
select {
case <-t.ctx.Done():
return nil
default:
t.cancel()
t.ticker.Stop()

<-t.waitClose
return nil
<-t.waitClose
return nil
}
}

func (t *trimmer) run() {
Expand Down

0 comments on commit c0d9b13

Please sign in to comment.