Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
hifi committed Dec 28, 2023
1 parent 4f207c8 commit adc6870
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 58 deletions.
12 changes: 6 additions & 6 deletions cmd/litestream/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

//"github.com/benbjohnson/litestream/abs"
"github.com/benbjohnson/litestream/file"
//"github.com/benbjohnson/litestream/gcs"
"github.com/benbjohnson/litestream/gcs"
"github.com/benbjohnson/litestream/s3"
//"github.com/benbjohnson/litestream/sftp"
_ "github.com/mattn/go-sqlite3"
Expand Down Expand Up @@ -422,11 +422,11 @@ func NewReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *litestream.Re
if r.Client, err = newS3ReplicaClientFromConfig(c, r); err != nil {
return nil, err
}
case "gcs":
if r.Client, err = newGCSReplicaClientFromConfig(c, r); err != nil {
return nil, err
}
/*
case "gcs":
if r.Client, err = newGCSReplicaClientFromConfig(c, r); err != nil {
return nil, err
}
case "abs":
if r.Client, err = newABSReplicaClientFromConfig(c, r); err != nil {
return nil, err
Expand Down Expand Up @@ -537,7 +537,6 @@ func newS3ReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_ *s
return client, nil
}

/*
// newGCSReplicaClientFromConfig returns a new instance of gcs.ReplicaClient built from config.
func newGCSReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_ *gcs.ReplicaClient, err error) {
// Ensure URL & constituent parts are not both specified.
Expand Down Expand Up @@ -577,6 +576,7 @@ func newGCSReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_ *
return client, nil
}

/*
// newABSReplicaClientFromConfig returns a new instance of abs.ReplicaClient built from config.
func newABSReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_ *abs.ReplicaClient, err error) {
// Ensure URL & constituent parts are not both specified.
Expand Down
6 changes: 3 additions & 3 deletions cmd/litestream/replicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/benbjohnson/litestream"
//"github.com/benbjohnson/litestream/abs"
"github.com/benbjohnson/litestream/file"
//"github.com/benbjohnson/litestream/gcs"
"github.com/benbjohnson/litestream/gcs"
"github.com/benbjohnson/litestream/s3"
//"github.com/benbjohnson/litestream/sftp"
"github.com/mattn/go-shellwords"
Expand Down Expand Up @@ -115,9 +115,9 @@ func (c *ReplicateCommand) Run() (err error) {
slog.Info("replicating to", "path", client.Path())
case *s3.ReplicaClient:
slog.Info("replicating to", "bucket", client.Bucket, "path", client.Path, "region", client.Region, "endpoint", client.Endpoint)
case *gcs.ReplicaClient:
slog.Info("replicating to", "bucket", client.Bucket, "path", client.Path)
/*
case *gcs.ReplicaClient:
slog.Info("replicating to", "bucket", client.Bucket, "path", client.Path)
case *abs.ReplicaClient:
slog.Info("replicating to", "bucket", client.Bucket, "path", client.Path, "endpoint", client.Endpoint)
case *sftp.ReplicaClient:
Expand Down
92 changes: 43 additions & 49 deletions gcs/replica_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,14 @@ func (c *ReplicaClient) Snapshots(ctx context.Context, generation string) (lites
}

// WriteSnapshot writes LZ4 compressed data from rd to the object storage.
func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, index int, rd io.Reader) (info litestream.SnapshotInfo, err error) {
func (c *ReplicaClient) WriteSnapshot(ctx context.Context, info *litestream.SnapshotInfo, rd io.Reader) error {
if err := c.Init(ctx); err != nil {
return info, err
return err
}

key, err := litestream.SnapshotPath(c.Path, generation, index)
key, err := litestream.SnapshotPath(c.Path, *info)
if err != nil {
return info, fmt.Errorf("cannot determine snapshot path: %w", err)
return fmt.Errorf("cannot determine snapshot path: %w", err)
}
startTime := time.Now()

Expand All @@ -155,31 +155,28 @@ func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, in

n, err := io.Copy(w, rd)
if err != nil {
return info, err
return err
} else if err := w.Close(); err != nil {
return info, err
return err
}

internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "PUT").Inc()
internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "PUT").Add(float64(n))

// log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime).Truncate(time.Millisecond))

return litestream.SnapshotInfo{
Generation: generation,
Index: index,
Size: n,
CreatedAt: startTime.UTC(),
}, nil
info.Size = n
info.CreatedAt = startTime.UTC()
return nil
}

// SnapshotReader returns a reader for snapshot data at the given generation/index.
func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) {
func (c *ReplicaClient) SnapshotReader(ctx context.Context, info litestream.SnapshotInfo) (io.ReadCloser, error) {
if err := c.Init(ctx); err != nil {
return nil, err
}

key, err := litestream.SnapshotPath(c.Path, generation, index)
key, err := litestream.SnapshotPath(c.Path, info)
if err != nil {
return nil, fmt.Errorf("cannot determine snapshot path: %w", err)
}
Expand All @@ -198,12 +195,12 @@ func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, i
}

// DeleteSnapshot deletes a snapshot with the given generation & index.
func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, index int) error {
func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, info litestream.SnapshotInfo) error {
if err := c.Init(ctx); err != nil {
return err
}

key, err := litestream.SnapshotPath(c.Path, generation, index)
key, err := litestream.SnapshotPath(c.Path, info)
if err != nil {
return fmt.Errorf("cannot determine snapshot path: %w", err)
}
Expand All @@ -229,14 +226,14 @@ func (c *ReplicaClient) WALSegments(ctx context.Context, generation string) (lit
}

// WriteWALSegment writes LZ4 compressed data from rd into a file on disk.
func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, rd io.Reader) (info litestream.WALSegmentInfo, err error) {
func (c *ReplicaClient) WriteWALSegment(ctx context.Context, info *litestream.WALSegmentInfo, rd io.Reader) error {
if err := c.Init(ctx); err != nil {
return info, err
return err
}

key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset)
key, err := litestream.WALSegmentPath(c.Path, *info)
if err != nil {
return info, fmt.Errorf("cannot determine wal segment path: %w", err)
return fmt.Errorf("cannot determine wal segment path: %w", err)
}
startTime := time.Now()

Expand All @@ -245,31 +242,27 @@ func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos,

n, err := io.Copy(w, rd)
if err != nil {
return info, err
return err
} else if err := w.Close(); err != nil {
return info, err
return err
}

internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "PUT").Inc()
internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "PUT").Add(float64(n))

return litestream.WALSegmentInfo{
Generation: pos.Generation,
Index: pos.Index,
Offset: pos.Offset,
Size: n,
CreatedAt: startTime.UTC(),
}, nil
info.Size = n
info.CreatedAt = startTime.UTC()
return nil
}

// WALSegmentReader returns a reader for a section of WAL data at the given index.
// Returns os.ErrNotExist if no matching index/offset is found.
func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos) (io.ReadCloser, error) {
func (c *ReplicaClient) WALSegmentReader(ctx context.Context, info litestream.WALSegmentInfo) (io.ReadCloser, error) {
if err := c.Init(ctx); err != nil {
return nil, err
}

key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset)
key, err := litestream.WALSegmentPath(c.Path, info)
if err != nil {
return nil, fmt.Errorf("cannot determine wal segment path: %w", err)
}
Expand All @@ -288,13 +281,13 @@ func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos
}

// DeleteWALSegments deletes WAL segments with at the given positions.
func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Pos) error {
func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.WALSegmentInfo) error {
if err := c.Init(ctx); err != nil {
return err
}

for _, pos := range a {
key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset)
for _, info := range a {
key, err := litestream.WALSegmentPath(c.Path, info)
if err != nil {
return fmt.Errorf("cannot determine wal segment path: %w", err)
}
Expand Down Expand Up @@ -343,19 +336,20 @@ func (itr *snapshotIterator) Next() bool {
return false
}

info := litestream.SnapshotInfo{
Generation: itr.generation,
Size: attrs.Size,
CreatedAt: attrs.Created.UTC(),
}

// Parse index, otherwise skip to the next object.
index, err := litestream.ParseSnapshotPath(path.Base(attrs.Name))
err = info.ParsePath(path.Base(attrs.Name))
if err != nil {
continue
}

// Store current snapshot and return.
itr.info = litestream.SnapshotInfo{
Generation: itr.generation,
Index: index,
Size: attrs.Size,
CreatedAt: attrs.Created.UTC(),
}
itr.info = info
return true
}
}
Expand Down Expand Up @@ -399,20 +393,20 @@ func (itr *walSegmentIterator) Next() bool {
return false
}

info := litestream.WALSegmentInfo{
Generation: itr.generation,
Size: attrs.Size,
CreatedAt: attrs.Created.UTC(),
}

// Parse index & offset, otherwise skip to the next object.
index, offset, err := litestream.ParseWALSegmentPath(path.Base(attrs.Name))
err = info.ParsePath(path.Base(attrs.Name))
if err != nil {
continue
}

// Store current snapshot and return.
itr.info = litestream.WALSegmentInfo{
Generation: itr.generation,
Index: index,
Offset: offset,
Size: attrs.Size,
CreatedAt: attrs.Created.UTC(),
}
itr.info = info
return true
}
}
Expand Down

0 comments on commit adc6870

Please sign in to comment.