From 6d41994ec8f9be6b603c5d19402530b053f816d3 Mon Sep 17 00:00:00 2001 From: Matt Ober <7811297+obo20@users.noreply.github.com> Date: Mon, 16 Aug 2021 17:40:45 -0500 Subject: [PATCH 1/7] Change go-ds-s3 to utilizing prefixes This changes the way the go-ds-s3 plugin stores data in s3 in an effort to drastically improve speed. Instead of storing data all in the /ipfs directory (which counts as one prefix), we're storing things in a way so each block gets its own prefix. --- s3.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/s3.go b/s3.go index 34b2113..4e54b59 100644 --- a/s3.go +++ b/s3.go @@ -105,7 +105,7 @@ func NewS3Datastore(conf Config) (*S3Bucket, error) { func (s *S3Bucket) Put(k ds.Key, value []byte) error { _, err := s.S3.PutObject(&s3.PutObjectInput{ Bucket: aws.String(s.Bucket), - Key: aws.String(s.s3Path(k.String())), + Key: aws.String(s.s3Path(k.String() + "/data")), Body: bytes.NewReader(value), }) return err @@ -118,7 +118,7 @@ func (s *S3Bucket) Sync(prefix ds.Key) error { func (s *S3Bucket) Get(k ds.Key) ([]byte, error) { resp, err := s.S3.GetObject(&s3.GetObjectInput{ Bucket: aws.String(s.Bucket), - Key: aws.String(s.s3Path(k.String())), + Key: aws.String(s.s3Path(k.String() + "/data")), }) if err != nil { if isNotFound(err) { @@ -145,7 +145,7 @@ func (s *S3Bucket) Has(k ds.Key) (exists bool, err error) { func (s *S3Bucket) GetSize(k ds.Key) (size int, err error) { resp, err := s.S3.HeadObject(&s3.HeadObjectInput{ Bucket: aws.String(s.Bucket), - Key: aws.String(s.s3Path(k.String())), + Key: aws.String(s.s3Path(k.String() + "/data")), }) if err != nil { if s3Err, ok := err.(awserr.Error); ok && s3Err.Code() == "NotFound" { @@ -159,7 +159,7 @@ func (s *S3Bucket) GetSize(k ds.Key) (size int, err error) { func (s *S3Bucket) Delete(k ds.Key) error { _, err := s.S3.DeleteObject(&s3.DeleteObjectInput{ Bucket: aws.String(s.Bucket), - Key: aws.String(s.s3Path(k.String())), + Key: aws.String(s.s3Path(k.String() + "/data")), }) if isNotFound(err) { // delete is idempotent @@ -268,7 +268,7 @@ type batchOp struct { } func (b *s3Batch) Put(k ds.Key, val []byte) error { - b.ops[k.String()] = batchOp{ + b.ops[k.String() + "/data"] = batchOp{ val: val, delete: false, } @@ -276,7 +276,7 @@ func (b *s3Batch) Put(k ds.Key, val []byte) error { } func (b *s3Batch) Delete(k ds.Key) error { - b.ops[k.String()] = batchOp{ + b.ops[k.String() + "/data"] = batchOp{ val: nil, delete: true, } @@ -319,7 +319,7 @@ func (b *s3Batch) Commit() error { } for _, k := range putKeys { - jobs <- b.newPutJob(k, b.ops[k.String()].val) + jobs <- b.newPutJob(k, b.ops[k.String() + "/data"].val) } if len(deleteObjs) > 0 { From d905b506f17a43085f888a98cfc2085eba3252ad Mon Sep 17 00:00:00 2001 From: Matt Ober <7811297+obo20@users.noreply.github.com> Date: Tue, 17 Aug 2021 08:53:18 -0500 Subject: [PATCH 2/7] bug fixes for the new data changes --- s3.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/s3.go b/s3.go index 4e54b59..79b4bc5 100644 --- a/s3.go +++ b/s3.go @@ -268,7 +268,7 @@ type batchOp struct { } func (b *s3Batch) Put(k ds.Key, val []byte) error { - b.ops[k.String() + "/data"] = batchOp{ + b.ops[k.String()] = batchOp{ val: val, delete: false, } @@ -276,7 +276,7 @@ func (b *s3Batch) Put(k ds.Key, val []byte) error { } func (b *s3Batch) Delete(k ds.Key) error { - b.ops[k.String() + "/data"] = batchOp{ + b.ops[k.String()] = batchOp{ val: nil, delete: true, } @@ -319,7 +319,7 @@ func (b *s3Batch) Commit() error { } for _, k := range putKeys { - jobs <- b.newPutJob(k, b.ops[k.String() + "/data"].val) + jobs <- b.newPutJob(k, b.ops[k.String()].val) } if len(deleteObjs) > 0 { From d171f0593f09d18ca109dd3f0f2c5b2b79ab108c Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Sat, 21 Aug 2021 10:28:03 -0700 Subject: [PATCH 3/7] make the key tranformation pluggable --- s3.go | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/s3.go b/s3.go index 79b4bc5..1d3ca82 100644 --- a/s3.go +++ b/s3.go @@ -53,6 +53,16 @@ type Config struct { RootDirectory string Workers int CredentialsEndpoint string + KeyTransform func(ds.Key) string +} + +func DefaultKeyTransform(k ds.Key) string { + return k.String() +} + +// BucketSplitKeyTransform can be used to ensure each cid ends up in its own bucket +func BucketSplitKeyTransform(k ds.Key) string { + return k.String() + "/data" } func NewS3Datastore(conf Config) (*S3Bucket, error) { @@ -60,6 +70,10 @@ func NewS3Datastore(conf Config) (*S3Bucket, error) { conf.Workers = defaultWorkers } + if conf.KeyTransform == nil { + conf.KeyTransform = DefaultKeyTransform + } + awsConfig := aws.NewConfig() sess, err := session.NewSession() if err != nil { @@ -105,7 +119,7 @@ func NewS3Datastore(conf Config) (*S3Bucket, error) { func (s *S3Bucket) Put(k ds.Key, value []byte) error { _, err := s.S3.PutObject(&s3.PutObjectInput{ Bucket: aws.String(s.Bucket), - Key: aws.String(s.s3Path(k.String() + "/data")), + Key: aws.String(s.s3Path(s.Config.KeyTransform(k))), Body: bytes.NewReader(value), }) return err @@ -118,7 +132,7 @@ func (s *S3Bucket) Sync(prefix ds.Key) error { func (s *S3Bucket) Get(k ds.Key) ([]byte, error) { resp, err := s.S3.GetObject(&s3.GetObjectInput{ Bucket: aws.String(s.Bucket), - Key: aws.String(s.s3Path(k.String() + "/data")), + Key: aws.String(s.s3Path(s.Config.KeyTransform(k))), }) if err != nil { if isNotFound(err) { @@ -145,7 +159,7 @@ func (s *S3Bucket) Has(k ds.Key) (exists bool, err error) { func (s *S3Bucket) GetSize(k ds.Key) (size int, err error) { resp, err := s.S3.HeadObject(&s3.HeadObjectInput{ Bucket: aws.String(s.Bucket), - Key: aws.String(s.s3Path(k.String() + "/data")), + Key: aws.String(s.s3Path(s.Config.KeyTransform(k))), }) if err != nil { if s3Err, ok := err.(awserr.Error); ok && s3Err.Code() == "NotFound" { @@ -159,7 +173,7 @@ func (s *S3Bucket) GetSize(k ds.Key) (size int, err error) { func (s *S3Bucket) Delete(k ds.Key) error { _, err := s.S3.DeleteObject(&s3.DeleteObjectInput{ Bucket: aws.String(s.Bucket), - Key: aws.String(s.s3Path(k.String() + "/data")), + Key: aws.String(s.s3Path(s.KeyTransform(k))), }) if isNotFound(err) { // delete is idempotent From 7b5dccfa7e356f370a50ba9b050165f27d27c50d Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Sat, 21 Aug 2021 11:55:50 -0700 Subject: [PATCH 4/7] use a simple suffix config field to make text based configs easier --- s3.go | 23 +++++------------------ 1 file changed, 5 insertions(+), 18 deletions(-) diff --git a/s3.go b/s3.go index 1d3ca82..d622130 100644 --- a/s3.go +++ b/s3.go @@ -53,16 +53,7 @@ type Config struct { RootDirectory string Workers int CredentialsEndpoint string - KeyTransform func(ds.Key) string -} - -func DefaultKeyTransform(k ds.Key) string { - return k.String() -} - -// BucketSplitKeyTransform can be used to ensure each cid ends up in its own bucket -func BucketSplitKeyTransform(k ds.Key) string { - return k.String() + "/data" + KeySuffix string } func NewS3Datastore(conf Config) (*S3Bucket, error) { @@ -70,10 +61,6 @@ func NewS3Datastore(conf Config) (*S3Bucket, error) { conf.Workers = defaultWorkers } - if conf.KeyTransform == nil { - conf.KeyTransform = DefaultKeyTransform - } - awsConfig := aws.NewConfig() sess, err := session.NewSession() if err != nil { @@ -119,7 +106,7 @@ func NewS3Datastore(conf Config) (*S3Bucket, error) { func (s *S3Bucket) Put(k ds.Key, value []byte) error { _, err := s.S3.PutObject(&s3.PutObjectInput{ Bucket: aws.String(s.Bucket), - Key: aws.String(s.s3Path(s.Config.KeyTransform(k))), + Key: aws.String(s.s3Path(k.String() + s.Config.KeySuffix)), Body: bytes.NewReader(value), }) return err @@ -132,7 +119,7 @@ func (s *S3Bucket) Sync(prefix ds.Key) error { func (s *S3Bucket) Get(k ds.Key) ([]byte, error) { resp, err := s.S3.GetObject(&s3.GetObjectInput{ Bucket: aws.String(s.Bucket), - Key: aws.String(s.s3Path(s.Config.KeyTransform(k))), + Key: aws.String(s.s3Path(k.String() + s.Config.KeySuffix)), }) if err != nil { if isNotFound(err) { @@ -159,7 +146,7 @@ func (s *S3Bucket) Has(k ds.Key) (exists bool, err error) { func (s *S3Bucket) GetSize(k ds.Key) (size int, err error) { resp, err := s.S3.HeadObject(&s3.HeadObjectInput{ Bucket: aws.String(s.Bucket), - Key: aws.String(s.s3Path(s.Config.KeyTransform(k))), + Key: aws.String(s.s3Path(k.String() + s.Config.KeySuffix)), }) if err != nil { if s3Err, ok := err.(awserr.Error); ok && s3Err.Code() == "NotFound" { @@ -173,7 +160,7 @@ func (s *S3Bucket) GetSize(k ds.Key) (size int, err error) { func (s *S3Bucket) Delete(k ds.Key) error { _, err := s.S3.DeleteObject(&s3.DeleteObjectInput{ Bucket: aws.String(s.Bucket), - Key: aws.String(s.s3Path(s.KeyTransform(k))), + Key: aws.String(s.s3Path(k.String() + s.Config.KeySuffix)), }) if isNotFound(err) { // delete is idempotent From 2466cc383ff8ae9cf6b533a3d12b929f0a205969 Mon Sep 17 00:00:00 2001 From: Matt Ober <7811297+obo20@users.noreply.github.com> Date: Sat, 21 Aug 2021 15:21:32 -0500 Subject: [PATCH 5/7] Update s3ds.go --- plugin/s3ds.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/plugin/s3ds.go b/plugin/s3ds.go index b6faca2..c442bcb 100644 --- a/plugin/s3ds.go +++ b/plugin/s3ds.go @@ -97,6 +97,14 @@ func (s3p S3Plugin) DatastoreConfigParser() fsrepo.ConfigFromMap { return nil, fmt.Errorf("s3ds: credentialsEndpoint not a string") } } + + var keySuffix string + if v, ok := m["keySuffix"]; ok { + keySuffix, ok = v.(string) + if !ok { + return nil, fmt.Errorf("s3ds: keySuffix not a string") + } + } return &S3Config{ cfg: s3ds.Config{ @@ -109,6 +117,7 @@ func (s3p S3Plugin) DatastoreConfigParser() fsrepo.ConfigFromMap { Workers: workers, RegionEndpoint: endpoint, CredentialsEndpoint: credentialsEndpoint, + KeySuffix: keySuffix }, }, nil } From ce9300df1e5704356ce61a8eb02c2df66e9ad9bd Mon Sep 17 00:00:00 2001 From: Matt Ober <7811297+obo20@users.noreply.github.com> Date: Sat, 21 Aug 2021 15:23:32 -0500 Subject: [PATCH 6/7] comma fix comma fix --- plugin/s3ds.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin/s3ds.go b/plugin/s3ds.go index c442bcb..762df1f 100644 --- a/plugin/s3ds.go +++ b/plugin/s3ds.go @@ -117,7 +117,7 @@ func (s3p S3Plugin) DatastoreConfigParser() fsrepo.ConfigFromMap { Workers: workers, RegionEndpoint: endpoint, CredentialsEndpoint: credentialsEndpoint, - KeySuffix: keySuffix + KeySuffix: keySuffix, }, }, nil } From 8159b3df57f562bfacf5ab0c7e7aaa79ec3abb74 Mon Sep 17 00:00:00 2001 From: Matt Date: Tue, 30 Nov 2021 17:28:18 -0600 Subject: [PATCH 7/7] updated to key transform function --- s3.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/s3.go b/s3.go index d622130..cf0e94f 100644 --- a/s3.go +++ b/s3.go @@ -56,6 +56,10 @@ type Config struct { KeySuffix string } +func (s *S3Bucket) TransformKey(key string) (newKey string) { + return key + s.Config.KeySuffix +} + func NewS3Datastore(conf Config) (*S3Bucket, error) { if conf.Workers == 0 { conf.Workers = defaultWorkers @@ -106,7 +110,7 @@ func NewS3Datastore(conf Config) (*S3Bucket, error) { func (s *S3Bucket) Put(k ds.Key, value []byte) error { _, err := s.S3.PutObject(&s3.PutObjectInput{ Bucket: aws.String(s.Bucket), - Key: aws.String(s.s3Path(k.String() + s.Config.KeySuffix)), + Key: aws.String(s.s3Path(s.TransformKey(k.String()))), Body: bytes.NewReader(value), }) return err @@ -119,7 +123,7 @@ func (s *S3Bucket) Sync(prefix ds.Key) error { func (s *S3Bucket) Get(k ds.Key) ([]byte, error) { resp, err := s.S3.GetObject(&s3.GetObjectInput{ Bucket: aws.String(s.Bucket), - Key: aws.String(s.s3Path(k.String() + s.Config.KeySuffix)), + Key: aws.String(s.s3Path(s.TransformKey(k.String()))), }) if err != nil { if isNotFound(err) { @@ -146,7 +150,7 @@ func (s *S3Bucket) Has(k ds.Key) (exists bool, err error) { func (s *S3Bucket) GetSize(k ds.Key) (size int, err error) { resp, err := s.S3.HeadObject(&s3.HeadObjectInput{ Bucket: aws.String(s.Bucket), - Key: aws.String(s.s3Path(k.String() + s.Config.KeySuffix)), + Key: aws.String(s.s3Path(s.TransformKey(k.String()))), }) if err != nil { if s3Err, ok := err.(awserr.Error); ok && s3Err.Code() == "NotFound" { @@ -160,7 +164,7 @@ func (s *S3Bucket) GetSize(k ds.Key) (size int, err error) { func (s *S3Bucket) Delete(k ds.Key) error { _, err := s.S3.DeleteObject(&s3.DeleteObjectInput{ Bucket: aws.String(s.Bucket), - Key: aws.String(s.s3Path(k.String() + s.Config.KeySuffix)), + Key: aws.String(s.s3Path(s.TransformKey(k.String()))), }) if isNotFound(err) { // delete is idempotent