From 7c22360cc3afc3e3f9acb9a1e2faaea46d600658 Mon Sep 17 00:00:00 2001 From: NoAccident Date: Wed, 27 Sep 2023 22:33:28 +0800 Subject: [PATCH] update 3.23.9 --- README.MD | 42 ++++++- README_CN.MD | 34 ++++++ main/obs_go_sample.go | 136 ++++++++++++++++++++-- obs/auth.go | 33 +++++- obs/authV4.go | 1 - obs/callback.go | 1 - obs/client_object.go | 67 +++++++---- obs/conf.go | 68 ++++++++++- obs/const.go | 19 +++- obs/convert.go | 56 +++++++-- obs/extension.go | 5 + obs/http.go | 189 ++++++++++++++++++------------- obs/log.go | 42 +++++++ obs/mime.go | 6 +- obs/model_base.go | 35 ++++-- obs/model_bucket.go | 20 ++-- obs/model_header.go | 1 + obs/model_object.go | 3 +- obs/model_other.go | 2 + obs/model_part.go | 1 + obs/model_response.go | 1 + obs/progress.go | 101 +++++++++++++++++ obs/provider.go | 1 - obs/temporary_createSignedUrl.go | 2 +- obs/temporary_signedUrl.go | 8 +- obs/trait_base.go | 4 + obs/trait_bucket.go | 7 ++ obs/trait_object.go | 21 ++-- obs/transfer.go | 57 ++++++++-- obs/util.go | 39 ++++++- 30 files changed, 827 insertions(+), 175 deletions(-) create mode 100644 obs/progress.go diff --git a/README.MD b/README.MD index 060ac9b..58ebfab 100644 --- a/README.MD +++ b/README.MD @@ -1,16 +1,52 @@ -Version 3.21.12 +Version 3.23.9 New Features: -1. Added the `obs.WithTrafficLimitHeader` method to limit the bandwidth of a single connection. +1. Added the obs.WithProgress method to support progress bar callback. +2. Added fragment expiration time in configuration life cycle management rules. Documentation & Demo: -1. Added descriptions about single-connection bandwidth throttling. + +----------------------------------------------------------------------------------- + +Version 3.23.4 + +New Features: +1. Added the obs.WithCustomHeader method when upload. +2. Added bucket customDomain-related APIs, including SetBucketCustomDomain, GetBucketCustomDomain, and DeleteBucketCustomDomain. +3. Added bucket mirrorBackToSource-related APIs, including SetBucketMirrorBackToSource, GetBucketMirrorBackToSource, and DeleteBucketMirrorBackToSource. + +Documentation & Demo: +1. Added descriptions about bucket customDomain APIs. +2. Added descriptions about bucket mirrorBackToSource APIs. + +----------------------------------------------------------------------------------- +Version 3.23.3 + +New Features: +1. Added the obs.WithCallbackHeader method when upload. + +Documentation & Demo: +1. Added descriptions about callback when upload. 2. Added descriptions about extended configurations of SDK APIs. Resolved Issues: 1. Optimized some code. ----------------------------------------------------------------------------------- +Version 3.21.12 + +New Features: +1. Added obs.WithTrafficLimitHeader method to realize single-connection bandwidth throttling. + +Documentation & Demo: +1. Added descriptions about single-connection bandwidth throttling. +2. Added descriptions about the SDK interface extension configuration. + +Resolved Issues: +1. Optimized part of the code. + +----------------------------------------------------------------------------------- + Version 3.21.8 New Features: diff --git a/README_CN.MD b/README_CN.MD index 90f2f01..8dd6c8e 100644 --- a/README_CN.MD +++ b/README_CN.MD @@ -1,3 +1,37 @@ +Version 3.23.9 + +新特性: +1. 新增obs.WithProgress方法支持进度条回调。 +2. 新增配置生命周期管理规则中的碎片过期时间。 + +资料&demo: + +----------------------------------------------------------------------------------- +Version 3.23.4 + +新特性: +1. 新增obs.WithCustomHeader方法。 +2. 新增自定义域名相关API,包括SetBucketCustomDomain,GetBucketCustomDomain,DeleteBucketCustomDomain。 +3. 新增镜像回源相关API,包括SetBucketMirrorBackToSource、GetBucketMirrorBackToSource、DeleteBucketMirrorBackToSource。 + +资料&demo: +1. 补充自定义域名相关API的描述。 +2. 补充镜像回源相关API的描述。 + +----------------------------------------------------------------------------------- +Version 3.23.3 + +新特性: +1. 新增obs.WithCallbackHeader方法; + +资料&demo: +1. 补充上传时回调的说明。 +2. 补充SDK API扩展配置说明。 + +修复问题: +1. 优化部分代码实现 + +----------------------------------------------------------------------------------- Version 3.21.12 新特性: diff --git a/main/obs_go_sample.go b/main/obs_go_sample.go index 08265aa..f05e71f 100644 --- a/main/obs_go_sample.go +++ b/main/obs_go_sample.go @@ -359,7 +359,7 @@ func setBucketCors() { var corsRules [2]obs.CorsRule corsRule0 := obs.CorsRule{} corsRule0.ID = "rule1" - corsRule0.AllowedOrigin = []string{"http://www.a.com", "http://www.b.com"} + corsRule0.AllowedOrigin = []string{"***"} corsRule0.AllowedMethod = []string{"GET", "PUT", "POST", "HEAD"} corsRule0.AllowedHeader = []string{"header1", "header2"} corsRule0.MaxAgeSeconds = 100 @@ -368,7 +368,7 @@ func setBucketCors() { corsRule1 := obs.CorsRule{} corsRule1.ID = "rule2" - corsRule1.AllowedOrigin = []string{"http://www.c.com", "http://www.d.com"} + corsRule1.AllowedOrigin = []string{"***"} corsRule1.AllowedMethod = []string{"GET", "PUT", "POST", "HEAD"} corsRule1.AllowedHeader = []string{"header3", "header4"} corsRule1.MaxAgeSeconds = 50 @@ -580,7 +580,7 @@ func getBucketLoggingConfiguration() { func setBucketWebsiteConfiguration() { input := &obs.SetBucketWebsiteConfigurationInput{} input.Bucket = bucketName - // input.RedirectAllRequestsTo.HostName = "www.a.com" + // input.RedirectAllRequestsTo.HostName = "***" // input.RedirectAllRequestsTo.Protocol = obs.ProtocolHttp input.IndexDocument.Suffix = "suffix" input.ErrorDocument.Key = "key" @@ -588,7 +588,7 @@ func setBucketWebsiteConfiguration() { var routingRules [2]obs.RoutingRule routingRule0 := obs.RoutingRule{} - routingRule0.Redirect.HostName = "www.a.com" + routingRule0.Redirect.HostName = "***" routingRule0.Redirect.Protocol = obs.ProtocolHttp routingRule0.Redirect.ReplaceKeyPrefixWith = "prefix" routingRule0.Redirect.HttpRedirectCode = "304" @@ -596,7 +596,7 @@ func setBucketWebsiteConfiguration() { routingRule1 := obs.RoutingRule{} - routingRule1.Redirect.HostName = "www.b.com" + routingRule1.Redirect.HostName = "***" routingRule1.Redirect.Protocol = obs.ProtocolHttps routingRule1.Redirect.ReplaceKeyWith = "replaceKey" routingRule1.Redirect.HttpRedirectCode = "304" @@ -822,7 +822,7 @@ func deleteBucketMirrorBackToSource() { func setBucketCustomDomain() { input := &obs.SetBucketCustomDomainInput{} input.Bucket = bucketName - input.CustomDomain = "www.example.com" + input.CustomDomain = "***" output, err := getObsClient().SetBucketCustomDomain(input) if err == nil { @@ -860,7 +860,7 @@ func getBucketCustomDomain() { func deleteBucketCustomdomain() { input := &obs.DeleteBucketCustomDomainInput{} input.Bucket = bucketName - input.CustomDomain = "www.test-go4444.com" + input.CustomDomain = "***" output, err := getObsClient().DeleteBucketCustomDomain(input) if err == nil { @@ -1602,6 +1602,126 @@ func renameFolder() { } +// 定义进度条监听器。 +type ObsProgressListener struct { +} + +// 定义进度变更事件处理函数。 +func (listener *ObsProgressListener) ProgressChanged(event *obs.ProgressEvent) { + switch event.EventType { + case obs.TransferStartedEvent: + fmt.Printf("Transfer Started, ConsumedBytes: %d, TotalBytes %d.\n", + event.ConsumedBytes, event.TotalBytes) + case obs.TransferDataEvent: + fmt.Printf("\rTransfer Data, ConsumedBytes: %d, TotalBytes %d, %d%%.\n", + event.ConsumedBytes, event.TotalBytes, event.ConsumedBytes*100/event.TotalBytes) + case obs.TransferCompletedEvent: + fmt.Printf("\nTransfer Completed, ConsumedBytes: %d, TotalBytes %d.\n", + event.ConsumedBytes, event.TotalBytes) + case obs.TransferFailedEvent: + fmt.Printf("\nTransfer Failed, ConsumedBytes: %d, TotalBytes %d.\n", + event.ConsumedBytes, event.TotalBytes) + default: + } +} + +func getObjectWithProgress() { + input := &obs.GetObjectInput{} + input.Bucket = bucketName + input.Key = objectKey + output, err := getObsClient().GetObject(input, obs.WithProgress(&ObsProgressListener{})) + if err == nil { + defer output.Body.Close() + fmt.Printf("StatusCode:%d, RequestId:%s\n", output.StatusCode, output.RequestId) + fmt.Printf("StorageClass:%s, ETag:%s, ContentType:%s, ContentLength:%d, LastModified:%s\n", + output.StorageClass, output.ETag, output.ContentType, output.ContentLength, output.LastModified) + p := make([]byte, 1024) + var readErr error + var readCount int + for { + readCount, readErr = output.Body.Read(p) + if readCount > 0 { + fmt.Printf("%s", p[:readCount]) + } + if readErr != nil { + break + } + } + } else { + if obsError, ok := err.(obs.ObsError); ok { + fmt.Println(obsError.StatusCode) + fmt.Println(obsError.Code) + fmt.Println(obsError.Message) + } else { + fmt.Println(err) + } + } +} + +func putObjectWithProgress() { + input := &obs.PutObjectInput{} + input.Bucket = bucketName + input.Key = objectKey + input.Metadata = map[string]string{"meta": "value"} + input.Body = strings.NewReader("Hello OBS") + output, err := getObsClient().PutObject(input, obs.WithProgress(&ObsProgressListener{})) + if err == nil { + fmt.Printf("StatusCode:%d, RequestId:%s\n", output.StatusCode, output.RequestId) + fmt.Printf("ETag:%s, StorageClass:%s\n", output.ETag, output.StorageClass) + } else { + if obsError, ok := err.(obs.ObsError); ok { + fmt.Println(obsError.StatusCode) + fmt.Println(obsError.Code) + fmt.Println(obsError.Message) + } else { + fmt.Println(err) + } + } +} + +func putFileWithProgress() { + input := &obs.PutFileInput{} + input.Bucket = bucketName + input.Key = objectKey + input.SourceFile = "localfile" + output, err := getObsClient().PutFile(input, obs.WithProgress(&ObsProgressListener{})) + if err == nil { + fmt.Printf("StatusCode:%d, RequestId:%s\n", output.StatusCode, output.RequestId) + fmt.Printf("ETag:%s, StorageClass:%s\n", output.ETag, output.StorageClass) + } else { + if obsError, ok := err.(obs.ObsError); ok { + fmt.Println(obsError.StatusCode) + fmt.Println(obsError.Code) + fmt.Println(obsError.Message) + } else { + fmt.Println(err) + } + } +} + +func appendObjectWithProgress() { + input := &obs.AppendObjectInput{} + input.Bucket = bucketName + input.Key = objectKey + input.Position = 9 + input.Body = strings.NewReader("Hello OBS") + output, err := getObsClient().AppendObject(input, obs.WithProgress(&ObsProgressListener{})) + if err == nil { + fmt.Printf("Append object(%s) under the bucket(%s) successful!\n", input.Key, input.Bucket) + fmt.Printf("ETag:%s, NextAppendPosition:%d\n", output.ETag, output.NextAppendPosition) + return + } + fmt.Printf("Append objects under the bucket(%s) fail!\n", input.Bucket) + if obsError, ok := err.(obs.ObsError); ok { + fmt.Println("An ObsError was found, which means your request sent to OBS was rejected with an error response.") + fmt.Println(obsError.Error()) + } else { + fmt.Println("An Exception was found, which means the client encountered an internal problem when attempting to communicate with OBS, for example, the client was unable to access the network.") + fmt.Println(err) + } + +} + func runExamples() { examples.RunBucketOperationsSample() // examples.RunObjectOperationsSample() @@ -1696,9 +1816,11 @@ func main() { // completeMultipartUpload() // abortMultipartUpload() // putObject() + // putObjectWithProgress() // putFile() // getObjectMetadata() // getObject() + // getObjectWithProgress() // putObjectWithCallback() // deleteBucket() } diff --git a/obs/auth.go b/obs/auth.go index 00c74cf..8a964d5 100644 --- a/obs/auth.go +++ b/obs/auth.go @@ -20,7 +20,16 @@ import ( "time" ) -func (obsClient ObsClient) doAuthTemporary(method, bucketName, objectKey string, params map[string]string, +func setURLWithPolicy(bucketName, canonicalizedUrl string) string { + if strings.HasPrefix(canonicalizedUrl, "/"+bucketName+"/") { + canonicalizedUrl = canonicalizedUrl[len("/"+bucketName+"/"):] + } else if strings.HasPrefix(canonicalizedUrl, "/"+bucketName) { + canonicalizedUrl = canonicalizedUrl[len("/"+bucketName):] + } + return canonicalizedUrl +} + +func (obsClient ObsClient) doAuthTemporary(method, bucketName, objectKey string, policy string, params map[string]string, headers map[string][]string, expires int64) (requestURL string, err error) { sh := obsClient.getSecurity() isAkSkEmpty := sh.ak == "" || sh.sk == "" @@ -31,6 +40,11 @@ func (obsClient ObsClient) doAuthTemporary(method, bucketName, objectKey string, params[HEADER_STS_TOKEN_AMZ] = sh.securityToken } } + + if policy != "" { + objectKey = "" + } + requestURL, canonicalizedURL := obsClient.conf.formatUrls(bucketName, objectKey, params, true) parsedRequestURL, err := url.Parse(requestURL) if err != nil { @@ -93,7 +107,13 @@ func (obsClient ObsClient) doAuthTemporary(method, bucketName, objectKey string, return "", parseDateErr } expires += date.Unix() - headers[HEADER_DATE_CAMEL] = []string{Int64ToString(expires)} + if policy == "" { + headers[HEADER_DATE_CAMEL] = []string{Int64ToString(expires)} + } else { + policy = Base64Encode([]byte(policy)) + headers[HEADER_DATE_CAMEL] = []string{policy} + canonicalizedURL = setURLWithPolicy(bucketName, canonicalizedURL) + } stringToSign := getV2StringToSign(method, canonicalizedURL, headers, obsClient.conf.signature == SignatureObs) signature := UrlEncode(Base64Encode(HmacSha1([]byte(sh.sk), []byte(stringToSign))), false) @@ -107,7 +127,14 @@ func (obsClient ObsClient) doAuthTemporary(method, bucketName, objectKey string, if obsClient.conf.signature != SignatureObs { requestURL += "AWS" } - requestURL += fmt.Sprintf("AccessKeyId=%s&Expires=%d&Signature=%s", UrlEncode(sh.ak, false), expires, signature) + if policy == "" { + requestURL += fmt.Sprintf("AccessKeyId=%s&Expires=%d&Signature=%s", UrlEncode(sh.ak, false), + expires, signature) + return + + } + requestURL += fmt.Sprintf("AccessKeyId=%s&Policy=%s&Signature=%s", UrlEncode(sh.ak, false), + UrlEncode(policy, false), signature) } } diff --git a/obs/authV4.go b/obs/authV4.go index 70abe96..258b342 100644 --- a/obs/authV4.go +++ b/obs/authV4.go @@ -76,7 +76,6 @@ func getV4StringToSign(method, canonicalizedURL, queryURL, scope, longDate, payl _stringToSign := strings.Join(stringToSign, "") - doLog(LEVEL_DEBUG, "The v4 auth stringToSign:\n%s", _stringToSign) return _stringToSign } diff --git a/obs/callback.go b/obs/callback.go index b2e41b7..1358e78 100644 --- a/obs/callback.go +++ b/obs/callback.go @@ -10,7 +10,6 @@ // CONDITIONS OF ANY KIND, either express or implied. See the License for the // specific language governing permissions and limitations under the License. -//nolint:structcheck, unused package obs import ( diff --git a/obs/client_object.go b/obs/client_object.go index fecdb60..7835e98 100644 --- a/obs/client_object.go +++ b/obs/client_object.go @@ -223,12 +223,32 @@ func (obsClient ObsClient) GetObject(input *GetObjectInput, extensions ...extens return nil, errors.New("GetObjectInput is nil") } output = &GetObjectOutput{} - err = obsClient.doActionWithBucketAndKey("GetObject", HTTP_GET, input.Bucket, input.Key, input, output, extensions) + err = obsClient.doActionWithBucketAndKeyWithProgress(GET_OBJECT, HTTP_GET, input.Bucket, input.Key, input, output, extensions, nil) if err != nil { output = nil - } else { - ParseGetObjectOutput(output) + return + } + + ParseGetObjectOutput(output) + listener := obsClient.getProgressListener(extensions) + if listener != nil { + output.Body = TeeReader(output.Body, output.ContentLength, listener, nil) + } + return +} + +func (obsClient ObsClient) GetObjectWithoutProgress(input *GetObjectInput, extensions ...extensionOptions) (output *GetObjectOutput, err error) { + if input == nil { + return nil, errors.New("GetObjectInput is nil") } + output = &GetObjectOutput{} + err = obsClient.doActionWithBucketAndKeyWithProgress(GET_OBJECT, HTTP_GET, input.Bucket, input.Key, input, output, extensions, nil) + if err != nil { + output = nil + return + } + + ParseGetObjectOutput(output) return } @@ -239,7 +259,7 @@ func (obsClient ObsClient) PutObject(input *PutObjectInput, extensions ...extens } if input.ContentType == "" && input.Key != "" { - if contentType, ok := mimeTypes[strings.ToLower(input.Key[strings.LastIndex(input.Key, ".")+1:])]; ok { + if contentType, ok := GetContentType(input.Key); ok { input.ContentType = contentType } } @@ -253,24 +273,27 @@ func (obsClient ObsClient) PutObject(input *PutObjectInput, extensions ...extens input.Body = &readerWrapper{reader: input.Body, totalCount: input.ContentLength} } } + + listener := obsClient.getProgressListener(extensions) if repeatable { - err = obsClient.doActionWithBucketAndKey("PutObject", HTTP_PUT, input.Bucket, input.Key, input, output, extensions) + err = obsClient.doActionWithBucketAndKeyWithProgress(PUT_OBJECT, HTTP_PUT, input.Bucket, input.Key, input, output, extensions, listener) } else { - err = obsClient.doActionWithBucketAndKeyUnRepeatable("PutObject", HTTP_PUT, input.Bucket, input.Key, input, output, extensions) + err = obsClient.doActionWithBucketAndKeyUnRepeatableWithProgress(PUT_OBJECT, HTTP_PUT, input.Bucket, input.Key, input, output, extensions, listener) } if err != nil { output = nil - } else { - ParsePutObjectOutput(output) + return } + ParsePutObjectOutput(output) + output.ObjectUrl = fmt.Sprintf("%s/%s/%s", obsClient.conf.endpoint, input.Bucket, input.Key) return } func (obsClient ObsClient) getContentType(input *PutObjectInput, sourceFile string) (contentType string) { - if contentType, ok := mimeTypes[strings.ToLower(input.Key[strings.LastIndex(input.Key, ".")+1:])]; ok { + if contentType, ok := GetContentType(input.Key); ok { return contentType } - if contentType, ok := mimeTypes[strings.ToLower(sourceFile[strings.LastIndex(sourceFile, ".")+1:])]; ok { + if contentType, ok := GetContentType(sourceFile); ok { return contentType } return @@ -349,14 +372,17 @@ func (obsClient ObsClient) PutFile(input *PutFileInput, extensions ...extensionO if obsClient.isGetContentType(_input) { _input.ContentType = obsClient.getContentType(_input, sourceFile) } - + listener := obsClient.getProgressListener(extensions) output = &PutObjectOutput{} - err = obsClient.doActionWithBucketAndKey("PutFile", HTTP_PUT, _input.Bucket, _input.Key, _input, output, extensions) + err = obsClient.doActionWithBucketAndKeyWithProgress(PUT_FILE, HTTP_PUT, _input.Bucket, _input.Key, _input, output, extensions, listener) + if err != nil { output = nil - } else { - ParsePutObjectOutput(output) + return } + + ParsePutObjectOutput(output) + output.ObjectUrl = fmt.Sprintf("%s/%s/%s", obsClient.conf.endpoint, input.Bucket, input.Key) return } @@ -405,17 +431,16 @@ func (obsClient ObsClient) AppendObject(input *AppendObjectInput, extensions ... input.Body = &readerWrapper{reader: input.Body, totalCount: input.ContentLength} } } + listener := obsClient.getProgressListener(extensions) + if repeatable { - err = obsClient.doActionWithBucketAndKey("AppendObject", HTTP_POST, input.Bucket, input.Key, input, output, extensions) + err = obsClient.doActionWithBucketAndKeyWithProgress(APPEND_OBJECT, HTTP_POST, input.Bucket, input.Key, input, output, extensions, listener) } else { - err = obsClient.doActionWithBucketAndKeyUnRepeatable("AppendObject", HTTP_POST, input.Bucket, input.Key, input, output, extensions) + err = obsClient.doActionWithBucketAndKeyUnRepeatableWithProgress(APPEND_OBJECT, HTTP_POST, input.Bucket, input.Key, input, output, extensions, listener) } - if err != nil { + + if err != nil || ParseAppendObjectOutput(output) != nil { output = nil - } else { - if err = ParseAppendObjectOutput(output); err != nil { - output = nil - } } return } diff --git a/obs/conf.go b/obs/conf.go index e4e27c9..74a3ad8 100644 --- a/obs/conf.go +++ b/obs/conf.go @@ -21,10 +21,14 @@ import ( "net" "net/http" "net/url" + "os" "sort" "strconv" "strings" + "sync" "time" + + "golang.org/x/net/http/httpproxy" ) type urlHolder struct { @@ -49,6 +53,8 @@ type config struct { finalTimeout int maxRetryCount int proxyURL string + noProxyURL string + proxyFromEnv bool maxConnsPerHost int pemCerts []byte transport *http.Transport @@ -58,6 +64,10 @@ type config struct { maxRedirectCount int userAgent string enableCompression bool + progressListener ProgressListener + + customProxyOnce sync.Once + customProxyFuncValue func(*url.URL) (*url.URL, error) } func (conf config) String() string { @@ -109,6 +119,20 @@ func WithProxyUrl(proxyURL string) configurer { } } +// WithNoProxyUrl is a configurer for ObsClient to set HTTP no_proxy. +func WithNoProxyUrl(noProxyURL string) configurer { + return func(conf *config) { + conf.noProxyURL = noProxyURL + } +} + +// WithProxyFromEnv is a configurer for ObsClient to get proxy from evironment. +func WithProxyFromEnv(proxyFromEnv bool) configurer { + return func(conf *config) { + conf.proxyFromEnv = proxyFromEnv + } +} + // WithMaxConnections is a configurer for ObsClient to set the maximum number of idle HTTP connections. func WithMaxConnections(maxConnsPerHost int) configurer { return func(conf *config) { @@ -342,13 +366,10 @@ func (conf *config) getTransport() error { ResponseHeaderTimeout: time.Second * time.Duration(conf.headerTimeout), IdleConnTimeout: time.Second * time.Duration(conf.idleConnTimeout), } - if conf.proxyURL != "" { - proxyURL, err := url.Parse(conf.proxyURL) - if err != nil { - return err - } - conf.transport.Proxy = http.ProxyURL(proxyURL) + conf.transport.Proxy = conf.customProxyFromEnvironment + } else if conf.proxyFromEnv { + conf.transport.Proxy = http.ProxyFromEnvironment } tlsConfig := &tls.Config{InsecureSkipVerify: !conf.sslVerify} @@ -365,6 +386,24 @@ func (conf *config) getTransport() error { return nil } +func (conf *config) customProxyFromEnvironment(req *http.Request) (*url.URL, error) { + url, err := conf.customProxyFunc()(req.URL) + return url, err +} + +func (conf *config) customProxyFunc() func(*url.URL) (*url.URL, error) { + conf.customProxyOnce.Do(func() { + customhttpproxy := &httpproxy.Config{ + HTTPProxy: conf.proxyURL, + HTTPSProxy: conf.proxyURL, + NoProxy: conf.noProxyURL, + CGI: os.Getenv("REQUEST_METHOD") != "", + } + conf.customProxyFuncValue = customhttpproxy.ProxyFunc() + }) + return conf.customProxyFuncValue +} + func checkRedirectFunc(req *http.Request, via []*http.Request) error { return http.ErrUseLastResponse } @@ -503,3 +542,20 @@ func getQueryURL(key, value string) string { queryURL += value return queryURL } + +var once sync.Once + +func (obsClient ObsClient) GetClientConfigure(extensions []extensionOptions) *config { + once.Do(func() { + for _, extension := range extensions { + if configure, ok := extension.(configurer); ok { + configure(obsClient.conf) + } + } + }) + return obsClient.conf +} + +func (obsClient ObsClient) getProgressListener(extensions []extensionOptions) ProgressListener { + return obsClient.GetClientConfigure(extensions).progressListener +} diff --git a/obs/const.go b/obs/const.go index b04db63..acb1e07 100644 --- a/obs/const.go +++ b/obs/const.go @@ -49,6 +49,8 @@ const ( HEADER_GRANT_READ_DELIVERED_OBS = "grant-read-delivered" HEADER_GRANT_FULL_CONTROL_DELIVERED_OBS = "grant-full-control-delivered" HEADER_REQUEST_ID = "request-id" + HEADER_ERROR_CODE = "error-code" + HEADER_ERROR_MESSAGE = "error-message" HEADER_BUCKET_REGION = "bucket-region" HEADER_ACCESS_CONRTOL_ALLOW_ORIGIN = "access-control-allow-origin" HEADER_ACCESS_CONRTOL_ALLOW_HEADERS = "access-control-allow-headers" @@ -130,6 +132,7 @@ const ( HEADER_CONTENT_ENCODING_CAMEL = "Content-Encoding" HEADER_CONTENT_LANGUAGE_CAMEL = "Content-Language" HEADER_EXPIRES_CAMEL = "Expires" + HEADER_ACCEPT_ENCODING = "Accept-Encoding" PARAM_VERSION_ID = "versionId" PARAM_RESPONSE_CONTENT_TYPE = "response-content-type" @@ -157,7 +160,6 @@ const ( DEFAULT_MAX_RETRY_COUNT = 3 DEFAULT_MAX_REDIRECT_COUNT = 3 DEFAULT_MAX_CONN_PER_HOST = 1000 - EMPTY_CONTENT_SHA256 = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" UNSIGNED_PAYLOAD = "UNSIGNED-PAYLOAD" LONG_DATE_FORMAT = "20060102T150405Z" SHORT_DATE_FORMAT = "20060102" @@ -195,6 +197,11 @@ const ( MIN_PART_SIZE = 100 * 1024 DEFAULT_PART_SIZE = 9 * 1024 * 1024 MAX_PART_NUM = 10000 + + GET_OBJECT = "GetObject" + PUT_OBJECT = "PutObject" + PUT_FILE = "PutFile" + APPEND_OBJECT = "AppendObject" ) var ( @@ -224,6 +231,16 @@ var ( "if-none-match": true, "last-modified": true, "content-range": true, + "accept-encoding": true, + } + + allowedLogResponseHTTPHeaderNames = map[string]bool{ + "content-type": true, + "etag": true, + "connection": true, + "content-length": true, + "date": true, + "server": true, } allowedResourceParameterNames = map[string]bool{ diff --git a/obs/convert.go b/obs/convert.go index f08dc42..97bedf5 100644 --- a/obs/convert.go +++ b/obs/convert.go @@ -348,11 +348,24 @@ func convertTransitionsToXML(transitions []Transition, isObs bool) string { return "" } +func converLifeCycleFilterToXML(filter LifecycleFilter) string { + if filter.Prefix == "" && len(filter.Tags) == 0 { + return "" + } + data, err := TransToXml(filter) + if err != nil { + return "" + } + return string(data) +} + func convertExpirationToXML(expiration Expiration) string { if expiration.Days > 0 { return fmt.Sprintf("%d", expiration.Days) } else if !expiration.Date.IsZero() { return fmt.Sprintf("%s", expiration.Date.UTC().Format(ISO8601_MIDNIGHT_DATE_FORMAT)) + } else if expiration.ExpiredObjectDeleteMarker != "" { + return fmt.Sprintf("%s", expiration.ExpiredObjectDeleteMarker) } return "" } @@ -385,6 +398,13 @@ func convertNoncurrentVersionExpirationToXML(noncurrentVersionExpiration Noncurr return "" } +func convertAbortIncompleteMultipartUploadToXML(abortIncompleteMultipartUpload AbortIncompleteMultipartUpload) string { + if abortIncompleteMultipartUpload.DaysAfterInitiation > 0 { + return fmt.Sprintf("%d", abortIncompleteMultipartUpload.DaysAfterInitiation) + } + return "" +} + // ConvertLifecyleConfigurationToXml converts BucketLifecyleConfiguration value to XML data and returns it func ConvertLifecyleConfigurationToXml(input BucketLifecyleConfiguration, returnMd5 bool, isObs bool) (data string, md5 string) { xml := make([]string, 0, 2+len(input.LifecycleRules)*9) @@ -396,8 +416,13 @@ func ConvertLifecyleConfigurationToXml(input BucketLifecyleConfiguration, return xml = append(xml, fmt.Sprintf("%s", lifecyleRuleID)) } lifecyleRulePrefix := XmlTranscoding(lifecyleRule.Prefix) - xml = append(xml, fmt.Sprintf("%s", lifecyleRulePrefix)) + if lifecyleRulePrefix != "" { + xml = append(xml, fmt.Sprintf("%s", lifecyleRulePrefix)) + } xml = append(xml, fmt.Sprintf("%s", lifecyleRule.Status)) + if ret := converLifeCycleFilterToXML(lifecyleRule.Filter); ret != "" { + xml = append(xml, ret) + } if ret := convertTransitionsToXML(lifecyleRule.Transitions, isObs); ret != "" { xml = append(xml, ret) } @@ -410,6 +435,9 @@ func ConvertLifecyleConfigurationToXml(input BucketLifecyleConfiguration, return if ret := convertNoncurrentVersionExpirationToXML(lifecyleRule.NoncurrentVersionExpiration); ret != "" { xml = append(xml, ret) } + if ret := convertAbortIncompleteMultipartUploadToXML(lifecyleRule.AbortIncompleteMultipartUpload); ret != "" { + xml = append(xml, ret) + } xml = append(xml, "") } xml = append(xml, "") @@ -749,6 +777,17 @@ func ParseCopyPartOutput(output *CopyPartOutput) { output.SseHeader = parseSseHeader(output.ResponseHeaders) } +// ParseStringToAvailableZoneType converts string value to AvailableZoneType value and returns it +func ParseStringToAvailableZoneType(value string) (ret AvailableZoneType) { + switch value { + case "3az": + ret = AvailableZoneMultiAz + default: + ret = "" + } + return +} + // ParseGetBucketMetadataOutput sets GetBucketMetadataOutput field values with response headers func ParseGetBucketMetadataOutput(output *GetBucketMetadataOutput) { output.AllowOrigin, output.AllowHeader, output.AllowMethod, output.ExposeHeader, output.MaxAgeSeconds = parseCorsHeader(output.BaseModel) @@ -769,7 +808,7 @@ func ParseGetBucketMetadataOutput(output *GetBucketMetadataOutput) { output.Epid = ret[0] } if ret, ok := output.ResponseHeaders[HEADER_AZ_REDUNDANCY]; ok { - output.AZRedundancy = ret[0] + output.AZRedundancy = ParseStringToAvailableZoneType(ret[0]) } if ret, ok := output.ResponseHeaders[HEADER_BUCKET_REDUNDANCY]; ok { output.BucketRedundancy = parseStringToBucketRedundancy(ret[0]) @@ -866,9 +905,6 @@ func ParseGetObjectOutput(output *GetObjectOutput) { func ConvertRequestToIoReaderV2(req interface{}) (io.Reader, string, error) { data, err := TransToXml(req) if err == nil { - if isDebugLogEnabled() { - doLog(LEVEL_DEBUG, "Do http request with data: %s", string(data)) - } return bytes.NewReader(data), Base64Md5(data), nil } return nil, "", err @@ -878,9 +914,6 @@ func ConvertRequestToIoReaderV2(req interface{}) (io.Reader, string, error) { func ConvertRequestToIoReader(req interface{}) (io.Reader, error) { body, err := TransToXml(req) if err == nil { - if isDebugLogEnabled() { - doLog(LEVEL_DEBUG, "Do http request with data: %s", string(body)) - } return bytes.NewReader(body), nil } return nil, err @@ -965,6 +998,13 @@ func ParseResponseToObsError(resp *http.Response, isObs bool) error { doLog(LEVEL_WARN, "Parse response to BaseModel with error: %v", respError) } obsError.Status = resp.Status + responseHeaders := cleanHeaderPrefix(resp.Header) + if values, ok := responseHeaders[HEADER_ERROR_MESSAGE]; ok { + obsError.Message = values[0] + } + if values, ok := responseHeaders[HEADER_ERROR_CODE]; ok { + obsError.Code = values[0] + } return obsError } diff --git a/obs/extension.go b/obs/extension.go index aca9535..baa132c 100644 --- a/obs/extension.go +++ b/obs/extension.go @@ -21,6 +21,11 @@ import ( type extensionOptions interface{} type extensionHeaders func(headers map[string][]string, isObs bool) error +func WithProgress(progressListener ProgressListener) configurer { + return func(conf *config) { + conf.progressListener = progressListener + } +} func setHeaderPrefix(key string, value string) extensionHeaders { return func(headers map[string][]string, isObs bool) error { if strings.TrimSpace(value) == "" { diff --git a/obs/http.go b/obs/http.go index 2ca58dc..58c8010 100644 --- a/obs/http.go +++ b/obs/http.go @@ -17,89 +17,119 @@ import ( "errors" "fmt" "io" + "io/ioutil" "math/rand" "net" "net/http" "net/url" "os" + "strconv" "strings" "time" ) func prepareHeaders(headers map[string][]string, meta bool, isObs bool) map[string][]string { _headers := make(map[string][]string, len(headers)) - if headers != nil { - for key, value := range headers { - key = strings.TrimSpace(key) - if key == "" { + for key, value := range headers { + key = strings.TrimSpace(key) + if key == "" { + continue + } + _key := strings.ToLower(key) + if _, ok := allowedRequestHTTPHeaderMetadataNames[_key]; !ok && !strings.HasPrefix(key, HEADER_PREFIX) && !strings.HasPrefix(key, HEADER_PREFIX_OBS) { + if !meta { continue } - _key := strings.ToLower(key) - if _, ok := allowedRequestHTTPHeaderMetadataNames[_key]; !ok && !strings.HasPrefix(key, HEADER_PREFIX) && !strings.HasPrefix(key, HEADER_PREFIX_OBS) { - if !meta { - continue - } - if !isObs { - _key = HEADER_PREFIX_META + _key - } else { - _key = HEADER_PREFIX_META_OBS + _key - } + if !isObs { + _key = HEADER_PREFIX_META + _key } else { - _key = key + _key = HEADER_PREFIX_META_OBS + _key } - _headers[_key] = value + } else { + _key = key } + _headers[_key] = value } return _headers } +func (obsClient ObsClient) checkParamsWithBucketName(bucketName string) bool { + return strings.TrimSpace(bucketName) == "" && !obsClient.conf.cname +} + +func (obsClient ObsClient) checkParamsWithObjectKey(objectKey string) bool { + return strings.TrimSpace(objectKey) == "" +} + func (obsClient ObsClient) doActionWithoutBucket(action, method string, input ISerializable, output IBaseModel, extensions []extensionOptions) error { - return obsClient.doAction(action, method, "", "", input, output, true, true, extensions) + return obsClient.doAction(action, method, "", "", input, output, true, true, extensions, nil) } func (obsClient ObsClient) doActionWithBucketV2(action, method, bucketName string, input ISerializable, output IBaseModel, extensions []extensionOptions) error { - if strings.TrimSpace(bucketName) == "" && !obsClient.conf.cname { + if obsClient.checkParamsWithBucketName(bucketName) { return errors.New("Bucket is empty") } - return obsClient.doAction(action, method, bucketName, "", input, output, false, true, extensions) + return obsClient.doAction(action, method, bucketName, "", input, output, false, true, extensions, nil) } func (obsClient ObsClient) doActionWithBucket(action, method, bucketName string, input ISerializable, output IBaseModel, extensions []extensionOptions) error { - if strings.TrimSpace(bucketName) == "" && !obsClient.conf.cname { + if obsClient.checkParamsWithBucketName(bucketName) { return errors.New("Bucket is empty") } - return obsClient.doAction(action, method, bucketName, "", input, output, true, true, extensions) + return obsClient.doAction(action, method, bucketName, "", input, output, true, true, extensions, nil) } func (obsClient ObsClient) doActionWithBucketAndKey(action, method, bucketName, objectKey string, input ISerializable, output IBaseModel, extensions []extensionOptions) error { - return obsClient._doActionWithBucketAndKey(action, method, bucketName, objectKey, input, output, true, extensions) + if obsClient.checkParamsWithBucketName(bucketName) { + return errors.New("Bucket is empty") + } + if obsClient.checkParamsWithObjectKey(objectKey) { + return errors.New("Key is empty") + } + return obsClient.doAction(action, method, bucketName, objectKey, input, output, true, true, extensions, nil) +} + +func (obsClient ObsClient) doActionWithBucketAndKeyWithProgress(action, method, bucketName, objectKey string, input ISerializable, output IBaseModel, extensions []extensionOptions, listener ProgressListener) error { + if obsClient.checkParamsWithBucketName(bucketName) { + return errors.New("Bucket is empty") + } + if obsClient.checkParamsWithObjectKey(objectKey) { + return errors.New("Key is empty") + } + return obsClient.doAction(action, method, bucketName, objectKey, input, output, true, true, extensions, listener) } func (obsClient ObsClient) doActionWithBucketAndKeyV2(action, method, bucketName, objectKey string, input ISerializable, output IBaseModel, extensions []extensionOptions) error { - if strings.TrimSpace(bucketName) == "" && !obsClient.conf.cname { + if obsClient.checkParamsWithBucketName(bucketName) { return errors.New("Bucket is empty") } - if strings.TrimSpace(objectKey) == "" { + if obsClient.checkParamsWithObjectKey(objectKey) { return errors.New("Key is empty") } - return obsClient.doAction(action, method, bucketName, objectKey, input, output, false, true, extensions) + return obsClient.doAction(action, method, bucketName, objectKey, input, output, false, true, extensions, nil) } func (obsClient ObsClient) doActionWithBucketAndKeyUnRepeatable(action, method, bucketName, objectKey string, input ISerializable, output IBaseModel, extensions []extensionOptions) error { - return obsClient._doActionWithBucketAndKey(action, method, bucketName, objectKey, input, output, false, extensions) + if obsClient.checkParamsWithBucketName(bucketName) { + return errors.New("Bucket is empty") + } + if obsClient.checkParamsWithObjectKey(objectKey) { + return errors.New("Key is empty") + } + return obsClient.doAction(action, method, bucketName, objectKey, input, output, true, false, extensions, nil) } -func (obsClient ObsClient) _doActionWithBucketAndKey(action, method, bucketName, objectKey string, input ISerializable, output IBaseModel, repeatable bool, extensions []extensionOptions) error { - if strings.TrimSpace(bucketName) == "" && !obsClient.conf.cname { +func (obsClient ObsClient) doActionWithBucketAndKeyUnRepeatableWithProgress(action, method, bucketName, objectKey string, input ISerializable, output IBaseModel, extensions []extensionOptions, listener ProgressListener) error { + if obsClient.checkParamsWithBucketName(bucketName) { return errors.New("Bucket is empty") } - if strings.TrimSpace(objectKey) == "" { + if obsClient.checkParamsWithObjectKey(objectKey) { return errors.New("Key is empty") } - return obsClient.doAction(action, method, bucketName, objectKey, input, output, true, repeatable, extensions) + return obsClient.doAction(action, method, bucketName, objectKey, input, output, true, false, extensions, listener) } -func (obsClient ObsClient) doAction(action, method, bucketName, objectKey string, input ISerializable, output IBaseModel, xmlResult bool, repeatable bool, extensions []extensionOptions) error { +func (obsClient ObsClient) doAction(action, method, bucketName, objectKey string, input ISerializable, output IBaseModel, xmlResult bool, repeatable bool, extensions []extensionOptions, listener ProgressListener) error { var resp *http.Response var respError error @@ -130,22 +160,8 @@ func (obsClient ObsClient) doAction(action, method, bucketName, objectKey string } } - switch method { - case HTTP_GET: - resp, respError = obsClient.doHTTPGet(bucketName, objectKey, params, headers, data, repeatable) - case HTTP_POST: - resp, respError = obsClient.doHTTPPost(bucketName, objectKey, params, headers, data, repeatable) - case HTTP_PUT: - resp, respError = obsClient.doHTTPPut(bucketName, objectKey, params, headers, data, repeatable) - case HTTP_DELETE: - resp, respError = obsClient.doHTTPDelete(bucketName, objectKey, params, headers, data, repeatable) - case HTTP_HEAD: - resp, respError = obsClient.doHTTPHead(bucketName, objectKey, params, headers, data, repeatable) - case HTTP_OPTIONS: - resp, respError = obsClient.doHTTPOptions(bucketName, objectKey, params, headers, data, repeatable) - default: - respError = errors.New("Unexpect http method error") - } + resp, respError = obsClient.doHTTPRequest(method, bucketName, objectKey, params, headers, data, repeatable, listener) + if respError == nil && output != nil { respError = HandleHttpResponse(action, headers, output, resp, xmlResult, isObs) } else { @@ -159,34 +175,9 @@ func (obsClient ObsClient) doAction(action, method, bucketName, objectKey string return respError } -func (obsClient ObsClient) doHTTPGet(bucketName, objectKey string, params map[string]string, - headers map[string][]string, data interface{}, repeatable bool) (*http.Response, error) { - return obsClient.doHTTP(HTTP_GET, bucketName, objectKey, params, prepareHeaders(headers, false, obsClient.conf.signature == SignatureObs), data, repeatable) -} - -func (obsClient ObsClient) doHTTPHead(bucketName, objectKey string, params map[string]string, - headers map[string][]string, data interface{}, repeatable bool) (*http.Response, error) { - return obsClient.doHTTP(HTTP_HEAD, bucketName, objectKey, params, prepareHeaders(headers, false, obsClient.conf.signature == SignatureObs), data, repeatable) -} - -func (obsClient ObsClient) doHTTPOptions(bucketName, objectKey string, params map[string]string, - headers map[string][]string, data interface{}, repeatable bool) (*http.Response, error) { - return obsClient.doHTTP(HTTP_OPTIONS, bucketName, objectKey, params, prepareHeaders(headers, false, obsClient.conf.signature == SignatureObs), data, repeatable) -} - -func (obsClient ObsClient) doHTTPDelete(bucketName, objectKey string, params map[string]string, - headers map[string][]string, data interface{}, repeatable bool) (*http.Response, error) { - return obsClient.doHTTP(HTTP_DELETE, bucketName, objectKey, params, prepareHeaders(headers, false, obsClient.conf.signature == SignatureObs), data, repeatable) -} - -func (obsClient ObsClient) doHTTPPut(bucketName, objectKey string, params map[string]string, - headers map[string][]string, data interface{}, repeatable bool) (*http.Response, error) { - return obsClient.doHTTP(HTTP_PUT, bucketName, objectKey, params, prepareHeaders(headers, true, obsClient.conf.signature == SignatureObs), data, repeatable) -} - -func (obsClient ObsClient) doHTTPPost(bucketName, objectKey string, params map[string]string, - headers map[string][]string, data interface{}, repeatable bool) (*http.Response, error) { - return obsClient.doHTTP(HTTP_POST, bucketName, objectKey, params, prepareHeaders(headers, true, obsClient.conf.signature == SignatureObs), data, repeatable) +func (obsClient ObsClient) doHTTPRequest(method, bucketName, objectKey string, params map[string]string, + headers map[string][]string, data interface{}, repeatable bool, listener ProgressListener) (*http.Response, error) { + return obsClient.doHTTP(method, bucketName, objectKey, params, prepareHeaders(headers, false, obsClient.conf.signature == SignatureObs), data, repeatable, listener) } func prepareAgentHeader(clientUserAgent string) string { @@ -203,7 +194,7 @@ func (obsClient ObsClient) getSignedURLResponse(action string, output IBaseModel respError = err resp = nil } else { - doLog(LEVEL_DEBUG, "Response headers: %v", resp.Header) + doLog(LEVEL_DEBUG, "Response headers: %s", logResponseHeader(resp.Header)) if resp.StatusCode >= 300 { respError = ParseResponseToObsError(resp, obsClient.conf.signature == SignatureObs) msg = resp.Status @@ -293,7 +284,7 @@ func prepareData(headers map[string][]string, data interface{}) (io.Reader, erro var _data io.Reader if data != nil { if dataStr, ok := data.(string); ok { - doLog(LEVEL_DEBUG, "Do http request with string: %s", dataStr) + doLog(LEVEL_DEBUG, "Do http request with string") headers["Content-Length"] = []string{IntToString(len(dataStr))} _data = strings.NewReader(dataStr) } else if dataByte, ok := data.([]byte); ok { @@ -360,7 +351,7 @@ func logHeaders(headers map[string][]string, signature SignatureType) { } else if securityToken, isSecurityToken = headers[HEADER_STS_TOKEN_OBS]; isSecurityToken { headers[HEADER_STS_TOKEN_OBS] = []string{"******"} } - doLog(LEVEL_DEBUG, "Request headers: %v", headers) + doLog(LEVEL_DEBUG, "Request headers: %s", logRequestHeader(headers)) headers[HEADER_AUTH_CAMEL] = auth if isSecurityToken { if signature == SignatureObs { @@ -475,8 +466,32 @@ func prepareRetry(resp *http.Response, headers map[string][]string, _data io.Rea return _data, resp, nil } +// handleBody handles request body +func handleBody(req *http.Request, body io.Reader, listener ProgressListener, tracker *readerTracker) { + reader := body + readerLen, err := GetReaderLen(reader) + if err == nil { + req.ContentLength = readerLen + } + if req.ContentLength > 0 { + req.Header.Set(HEADER_CONTENT_LENGTH_CAMEL, strconv.FormatInt(req.ContentLength, 10)) + } + + if reader != nil { + reader = TeeReader(reader, req.ContentLength, listener, tracker) + } + + // HTTP body + rc, ok := reader.(io.ReadCloser) + if !ok && reader != nil { + rc = ioutil.NopCloser(reader) + } + + req.Body = rc +} + func (obsClient ObsClient) doHTTP(method, bucketName, objectKey string, params map[string]string, - headers map[string][]string, data interface{}, repeatable bool) (resp *http.Response, respError error) { + headers map[string][]string, data interface{}, repeatable bool, listener ProgressListener) (resp *http.Response, respError error) { bucketName = strings.TrimSpace(bucketName) @@ -494,6 +509,9 @@ func (obsClient ObsClient) doHTTP(method, bucketName, objectKey string, params m var lastRequest *http.Request redirectFlag := false + + tracker := &readerTracker{completedBytes: 0} + for i, redirectCount := 0, 0; i <= maxRetryCount; i++ { req, err := obsClient.getRequest(redirectURL, requestURL, redirectFlag, _data, method, bucketName, objectKey, params, headers) @@ -501,10 +519,16 @@ func (obsClient ObsClient) doHTTP(method, bucketName, objectKey string, params m return nil, err } + handleBody(req, _data, listener, tracker) + logHeaders(headers, obsClient.conf.signature) lastRequest = prepareReq(headers, req, lastRequest, obsClient.conf.userAgent) + // Transfer started + event := newProgressEvent(TransferStartedEvent, 0, req.ContentLength) + publishProgress(listener, event) + start := GetCurrentTimestamp() resp, err = obsClient.httpClient.Do(req) doLog(LEVEL_INFO, "Do http request cost %d ms", (GetCurrentTimestamp() - start)) @@ -518,11 +542,14 @@ func (obsClient ObsClient) doHTTP(method, bucketName, objectKey string, params m break } } else { - doLog(LEVEL_DEBUG, "Response headers: %v", resp.Header) + doLog(LEVEL_DEBUG, "Response headers: %s", logResponseHeader(resp.Header)) if resp.StatusCode < 300 { respError = nil break } else if canNotRetry(repeatable, resp.StatusCode) { + event = newProgressEvent(TransferFailedEvent, tracker.completedBytes, req.ContentLength) + publishProgress(listener, event) + respError = ParseResponseToObsError(resp, obsClient.conf.signature == SignatureObs) resp = nil break @@ -564,6 +591,8 @@ func (obsClient ObsClient) doHTTP(method, bucketName, objectKey string, params m respError = ParseResponseToObsError(resp, obsClient.conf.signature == SignatureObs) resp = nil } + event = newProgressEvent(TransferFailedEvent, tracker.completedBytes, req.ContentLength) + publishProgress(listener, event) } } return diff --git a/obs/log.go b/obs/log.go index 20a0631..9896926 100644 --- a/obs/log.go +++ b/obs/log.go @@ -15,6 +15,7 @@ package obs import ( "fmt" "log" + "net/http" "os" "path/filepath" "runtime" @@ -285,6 +286,7 @@ func initLogFile(_fullPath string) (os.FileInfo, *os.File, error) { return nil, nil, err } } + return stat, fd, nil } @@ -317,6 +319,10 @@ func doLog(level Level, format string, v ...interface{}) { msg = fmt.Sprintf("%s:%d|%s", file, line, msg) } prefix := logLevelMap[level] + defer func() { + _ = recover() + // ignore ch closed error + }() if consoleLogger != nil { consoleLogger.Printf("%s%s", prefix, msg) } @@ -332,3 +338,39 @@ func checkAndLogErr(err error, level Level, format string, v ...interface{}) { doLog(level, format, v...) } } + +func logResponseHeader(respHeader http.Header) string { + resp := make([]string, 0, len(respHeader)+1) + for key, value := range respHeader { + key = strings.TrimSpace(key) + if key == "" { + continue + } + if strings.HasPrefix(key, HEADER_PREFIX) || strings.HasPrefix(key, HEADER_PREFIX_OBS) { + key = key[len(HEADER_PREFIX):] + } + _key := strings.ToLower(key) + if _, ok := allowedLogResponseHTTPHeaderNames[_key]; ok { + resp = append(resp, fmt.Sprintf("%s: [%s]", key, value[0])) + } + if _key == HEADER_REQUEST_ID { + resp = append(resp, fmt.Sprintf("%s: [%s]", key, value[0])) + } + } + return strings.Join(resp, " ") +} + +func logRequestHeader(reqHeader http.Header) string { + resp := make([]string, 0, len(reqHeader)+1) + for key, value := range reqHeader { + key = strings.TrimSpace(key) + if key == "" { + continue + } + _key := strings.ToLower(key) + if _, ok := allowedRequestHTTPHeaderMetadataNames[_key]; ok { + resp = append(resp, fmt.Sprintf("%s: [%s]", key, value[0])) + } + } + return strings.Join(resp, " ") +} diff --git a/obs/mime.go b/obs/mime.go index 8fb11b5..3fba97a 100644 --- a/obs/mime.go +++ b/obs/mime.go @@ -391,9 +391,13 @@ var mimeTypes = map[string]string{ "zip": "application/zip", "dotx": "application/vnd.openxmlformats-officedocument.wordprocessingml.template", "wps": "application/vnd.ms-works", - "wpt": "x-lml/x-gps", "pptm": "application/vnd.ms-powerpoint.presentation.macroenabled.12", "heic": "image/heic", "mkv": "video/x-matroska", "raw": "image/x-panasonic-raw", + "webp": "image/webp", + "3gp": "audio/3gpp", + "3g2": "audio/3gpp2", + "weba": "audio/webm", + "woff2": "font/woff2", } diff --git a/obs/model_base.go b/obs/model_base.go index 5faabd3..611e521 100644 --- a/obs/model_base.go +++ b/obs/model_base.go @@ -247,9 +247,10 @@ type Transition struct { // Expiration defines expiration property in LifecycleRule type Expiration struct { - XMLName xml.Name `xml:"Expiration"` - Date time.Time `xml:"Date,omitempty"` - Days int `xml:"Days,omitempty"` + XMLName xml.Name `xml:"Expiration"` + Date time.Time `xml:"Date,omitempty"` + Days int `xml:"Days,omitempty"` + ExpiredObjectDeleteMarker string `xml:"ExpiredObjectDeleteMarker,omitempty"` } // NoncurrentVersionTransition defines noncurrentVersion transition property in LifecycleRule @@ -265,15 +266,29 @@ type NoncurrentVersionExpiration struct { NoncurrentDays int `xml:"NoncurrentDays"` } +// AbortIncompleteMultipartUpload defines abortIncomplete expiration property in LifecycleRule +type AbortIncompleteMultipartUpload struct { + XMLName xml.Name `xml:"AbortIncompleteMultipartUpload"` + DaysAfterInitiation int `xml:"DaysAfterInitiation"` +} + // LifecycleRule defines lifecycle rule type LifecycleRule struct { - ID string `xml:"ID,omitempty"` - Prefix string `xml:"Prefix"` - Status RuleStatusType `xml:"Status"` - Transitions []Transition `xml:"Transition,omitempty"` - Expiration Expiration `xml:"Expiration,omitempty"` - NoncurrentVersionTransitions []NoncurrentVersionTransition `xml:"NoncurrentVersionTransition,omitempty"` - NoncurrentVersionExpiration NoncurrentVersionExpiration `xml:"NoncurrentVersionExpiration,omitempty"` + ID string `xml:"ID,omitempty"` + Prefix string `xml:"Prefix"` + Status RuleStatusType `xml:"Status"` + Transitions []Transition `xml:"Transition,omitempty"` + Expiration Expiration `xml:"Expiration,omitempty"` + NoncurrentVersionTransitions []NoncurrentVersionTransition `xml:"NoncurrentVersionTransition,omitempty"` + NoncurrentVersionExpiration NoncurrentVersionExpiration `xml:"NoncurrentVersionExpiration,omitempty"` + AbortIncompleteMultipartUpload AbortIncompleteMultipartUpload `xml:"AbortIncompleteMultipartUpload,omitempty"` + Filter LifecycleFilter `xml:"Filter,omitempty"` +} + +type LifecycleFilter struct { + XMLName xml.Name `xml:"Filter"` + Prefix string `xml:"And>Prefix,omitempty"` + Tags []Tag `xml:"And>Tag,omitempty"` } // BucketEncryptionConfiguration defines the bucket encryption configuration diff --git a/obs/model_bucket.go b/obs/model_bucket.go index 866e656..c3033fd 100644 --- a/obs/model_bucket.go +++ b/obs/model_bucket.go @@ -55,14 +55,20 @@ type Domain struct { type ListBucketsInput struct { QueryLocation bool BucketType BucketType + MaxKeys int + Marker string } // ListBucketsOutput is the result of ListBuckets function type ListBucketsOutput struct { BaseModel - XMLName xml.Name `xml:"ListAllMyBucketsResult"` - Owner Owner `xml:"Owner"` - Buckets []Bucket `xml:"Buckets>Bucket"` + XMLName xml.Name `xml:"ListAllMyBucketsResult"` + Owner Owner `xml:"Owner"` + Buckets []Bucket `xml:"Buckets>Bucket"` + IsTruncated bool `xml:"IsTruncated"` + Marker string `xml:"Marker"` + NextMarker string `xml:"NextMarker"` + MaxKeys int `xml:"MaxKeys"` } // CreateBucketInput is the input parameter of CreateBucket function @@ -222,15 +228,11 @@ type SetObjectMetadataInput struct { Key string VersionId string MetadataDirective MetadataDirectiveType - CacheControl string - ContentDisposition string - ContentEncoding string - ContentLanguage string - ContentType string Expires string WebsiteRedirectLocation string StorageClass StorageClassType Metadata map[string]string + HttpHeader } //SetObjectMetadataOutput is the result of SetObjectMetadata function @@ -260,7 +262,7 @@ type GetBucketMetadataOutput struct { MaxAgeSeconds int ExposeHeader string Epid string - AZRedundancy string + AZRedundancy AvailableZoneType FSStatus FSStatusType BucketRedundancy BucketRedundancyType } diff --git a/obs/model_header.go b/obs/model_header.go index c76d9f0..65dab04 100644 --- a/obs/model_header.go +++ b/obs/model_header.go @@ -9,6 +9,7 @@ // under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR // CONDITIONS OF ANY KIND, either express or implied. See the License for the // specific language governing permissions and limitations under the License. + package obs // ISseHeader defines the sse encryption header diff --git a/obs/model_object.go b/obs/model_object.go index b5fb0d0..9e8a084 100644 --- a/obs/model_object.go +++ b/obs/model_object.go @@ -194,6 +194,7 @@ type GetObjectInput struct { GetObjectMetadataInput IfMatch string IfNoneMatch string + AcceptEncoding string IfUnmodifiedSince time.Time IfModifiedSince time.Time RangeStart int64 @@ -233,6 +234,7 @@ type ObjectOperationInput struct { Expires int64 SseHeader ISseHeader Metadata map[string]string + HttpHeader } // PutObjectBasicInput defines the basic object operation properties @@ -240,7 +242,6 @@ type PutObjectBasicInput struct { ObjectOperationInput ContentMD5 string ContentLength int64 - HttpHeader } // PutObjectInput is the input parameter of PutObject function diff --git a/obs/model_other.go b/obs/model_other.go index 8ede31d..4d4a320 100644 --- a/obs/model_other.go +++ b/obs/model_other.go @@ -9,6 +9,7 @@ // under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR // CONDITIONS OF ANY KIND, either express or implied. See the License for the // specific language governing permissions and limitations under the License. + package obs import ( @@ -20,6 +21,7 @@ type CreateSignedUrlInput struct { Method HttpMethodType Bucket string Key string + Policy string SubResource SubResourceType Expires int Headers map[string]string diff --git a/obs/model_part.go b/obs/model_part.go index f9503fd..66f485e 100644 --- a/obs/model_part.go +++ b/obs/model_part.go @@ -9,6 +9,7 @@ // under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR // CONDITIONS OF ANY KIND, either express or implied. See the License for the // specific language governing permissions and limitations under the License. + package obs import ( diff --git a/obs/model_response.go b/obs/model_response.go index a8db53e..ec2536c 100644 --- a/obs/model_response.go +++ b/obs/model_response.go @@ -9,6 +9,7 @@ // under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR // CONDITIONS OF ANY KIND, either express or implied. See the License for the // specific language governing permissions and limitations under the License. + package obs import ( diff --git a/obs/progress.go b/obs/progress.go new file mode 100644 index 0000000..89fecfc --- /dev/null +++ b/obs/progress.go @@ -0,0 +1,101 @@ +package obs + +import ( + "io" +) + +type ProgressEventType int + +type ProgressEvent struct { + ConsumedBytes int64 + TotalBytes int64 + EventType ProgressEventType +} + +const ( + TransferStartedEvent ProgressEventType = 1 + iota + TransferDataEvent + TransferCompletedEvent + TransferFailedEvent +) + +func newProgressEvent(eventType ProgressEventType, consumed, total int64) *ProgressEvent { + return &ProgressEvent{ + ConsumedBytes: consumed, + TotalBytes: total, + EventType: eventType, + } +} + +type ProgressListener interface { + ProgressChanged(event *ProgressEvent) +} + +type readerTracker struct { + completedBytes int64 +} + +// publishProgress +func publishProgress(listener ProgressListener, event *ProgressEvent) { + if listener != nil && event != nil { + listener.ProgressChanged(event) + } +} + +type teeReader struct { + reader io.Reader + consumedBytes int64 + totalBytes int64 + tracker *readerTracker + listener ProgressListener +} + +func TeeReader(reader io.Reader, totalBytes int64, listener ProgressListener, tracker *readerTracker) io.ReadCloser { + return &teeReader{ + reader: reader, + consumedBytes: 0, + totalBytes: totalBytes, + tracker: tracker, + listener: listener, + } +} + +func (t *teeReader) Read(p []byte) (n int, err error) { + n, err = t.reader.Read(p) + + if err != nil && err != io.EOF { + event := newProgressEvent(TransferFailedEvent, t.consumedBytes, t.totalBytes) + publishProgress(t.listener, event) + } + + if n > 0 { + t.consumedBytes += int64(n) + + if t.listener != nil { + event := newProgressEvent(TransferDataEvent, t.consumedBytes, t.totalBytes) + publishProgress(t.listener, event) + } + + if t.tracker != nil { + t.tracker.completedBytes = t.consumedBytes + } + } + + if err == io.EOF { + event := newProgressEvent(TransferCompletedEvent, t.consumedBytes, t.totalBytes) + publishProgress(t.listener, event) + } + + return +} + +func (r *teeReader) Size() int64 { + return r.totalBytes +} + +func (t *teeReader) Close() error { + if rc, ok := t.reader.(io.ReadCloser); ok { + return rc.Close() + } + return nil +} diff --git a/obs/provider.go b/obs/provider.go index 2e485c1..297efa4 100644 --- a/obs/provider.go +++ b/obs/provider.go @@ -206,7 +206,6 @@ func (ecsSp *EcsSecurityProvider) getSecurity() securityHolder { } func getInternalTransport() *http.Transport { - timeout := 10 transport := &http.Transport{ Dial: func(network, addr string) (net.Conn, error) { diff --git a/obs/temporary_createSignedUrl.go b/obs/temporary_createSignedUrl.go index 5c27e64..14d7742 100644 --- a/obs/temporary_createSignedUrl.go +++ b/obs/temporary_createSignedUrl.go @@ -52,7 +52,7 @@ func (obsClient ObsClient) CreateSignedUrl(input *CreateSignedUrlInput, extensio input.Expires = 300 } - requestURL, err := obsClient.doAuthTemporary(string(input.Method), input.Bucket, input.Key, params, headers, int64(input.Expires)) + requestURL, err := obsClient.doAuthTemporary(string(input.Method), input.Bucket, input.Key, input.Policy, params, headers, int64(input.Expires)) if err != nil { return nil, err } diff --git a/obs/temporary_signedUrl.go b/obs/temporary_signedUrl.go index be92782..c6d6f6f 100644 --- a/obs/temporary_signedUrl.go +++ b/obs/temporary_signedUrl.go @@ -508,7 +508,7 @@ func (obsClient ObsClient) GetObjectMetadataWithSignedUrl(signedUrl string, actu // GetObjectWithSignedUrl downloads object with the specified signed url and signed request headers func (obsClient ObsClient) GetObjectWithSignedUrl(signedUrl string, actualSignedRequestHeaders http.Header) (output *GetObjectOutput, err error) { output = &GetObjectOutput{} - err = obsClient.doHTTPWithSignedURL("GetObject", HTTP_GET, signedUrl, actualSignedRequestHeaders, nil, output, true) + err = obsClient.doHTTPWithSignedURL(GET_OBJECT, HTTP_GET, signedUrl, actualSignedRequestHeaders, nil, output, true) if err != nil { output = nil } else { @@ -520,7 +520,7 @@ func (obsClient ObsClient) GetObjectWithSignedUrl(signedUrl string, actualSigned // PutObjectWithSignedUrl uploads an object to the specified bucket with the specified signed url and signed request headers and data func (obsClient ObsClient) PutObjectWithSignedUrl(signedUrl string, actualSignedRequestHeaders http.Header, data io.Reader) (output *PutObjectOutput, err error) { output = &PutObjectOutput{} - err = obsClient.doHTTPWithSignedURL("PutObject", HTTP_PUT, signedUrl, actualSignedRequestHeaders, data, output, true) + err = obsClient.doHTTPWithSignedURL(PUT_OBJECT, HTTP_PUT, signedUrl, actualSignedRequestHeaders, data, output, true) if err != nil { output = nil } else { @@ -570,7 +570,7 @@ func (obsClient ObsClient) PutFileWithSignedUrl(signedUrl string, actualSignedRe } output = &PutObjectOutput{} - err = obsClient.doHTTPWithSignedURL("PutObject", HTTP_PUT, signedUrl, actualSignedRequestHeaders, data, output, true) + err = obsClient.doHTTPWithSignedURL(PUT_FILE, HTTP_PUT, signedUrl, actualSignedRequestHeaders, data, output, true) if err != nil { output = nil } else { @@ -734,7 +734,7 @@ func (obsClient ObsClient) DeleteBucketEncryptionWithSignedURL(signedURL string, // AppendObjectWithSignedUrl uploads an object to the specified bucket with the specified signed url and signed request headers and data func (obsClient ObsClient) AppendObjectWithSignedURL(signedURL string, actualSignedRequestHeaders http.Header, data io.Reader) (output *AppendObjectOutput, err error) { output = &AppendObjectOutput{} - err = obsClient.doHTTPWithSignedURL("AppendObject", HTTP_POST, signedURL, actualSignedRequestHeaders, data, output, true) + err = obsClient.doHTTPWithSignedURL(APPEND_OBJECT, HTTP_POST, signedURL, actualSignedRequestHeaders, data, output, true) if err != nil { output = nil } else { diff --git a/obs/trait_base.go b/obs/trait_base.go index 63bddb2..909c7a0 100644 --- a/obs/trait_base.go +++ b/obs/trait_base.go @@ -16,6 +16,10 @@ import ( "io" ) +type IRepeatable interface { + Reset() error +} + // IReadCloser defines interface with function: setReadCloser type IReadCloser interface { setReadCloser(body io.ReadCloser) diff --git a/obs/trait_bucket.go b/obs/trait_bucket.go index 97b66e8..2818084 100644 --- a/obs/trait_bucket.go +++ b/obs/trait_bucket.go @@ -18,6 +18,13 @@ import ( ) func (input ListBucketsInput) trans(isObs bool) (params map[string]string, headers map[string][]string, data interface{}, err error) { + params = make(map[string]string) + if input.MaxKeys > 0 { + params["max-keys"] = IntToString(input.MaxKeys) + } + if input.Marker != "" { + params["marker"] = input.Marker + } headers = make(map[string][]string) if input.QueryLocation && !isObs { setHeaders(headers, HEADER_LOCATION_AMZ, []string{"true"}, isObs) diff --git a/obs/trait_object.go b/obs/trait_object.go index 7aaf425..48011ca 100644 --- a/obs/trait_object.go +++ b/obs/trait_object.go @@ -148,6 +148,9 @@ func (input GetObjectMetadataInput) trans(isObs bool) (params map[string]string, } func (input SetObjectMetadataInput) prepareContentHeaders(headers map[string][]string) { + if input.CacheControl != "" { + headers[HEADER_CACHE_CONTROL_CAMEL] = []string{input.CacheControl} + } if input.ContentDisposition != "" { headers[HEADER_CONTENT_DISPOSITION_CAMEL] = []string{input.ContentDisposition} } @@ -157,10 +160,15 @@ func (input SetObjectMetadataInput) prepareContentHeaders(headers map[string][]s if input.ContentLanguage != "" { headers[HEADER_CONTENT_LANGUAGE_CAMEL] = []string{input.ContentLanguage} } - if input.ContentType != "" { headers[HEADER_CONTENT_TYPE_CAML] = []string{input.ContentType} } + // 这里为了兼容老版本,默认以Expire值为准,但如果Expires没有,则以HttpExpires为准。 + if input.Expires != "" { + headers[HEADER_EXPIRES_CAMEL] = []string{input.Expires} + } else if input.HttpExpires != "" { + headers[HEADER_EXPIRES_CAMEL] = []string{input.HttpExpires} + } } func (input SetObjectMetadataInput) prepareStorageClass(headers map[string][]string, isObs bool) { @@ -189,13 +197,8 @@ func (input SetObjectMetadataInput) trans(isObs bool) (params map[string]string, } else { setHeaders(headers, HEADER_METADATA_DIRECTIVE, []string{string(ReplaceNew)}, isObs) } - if input.CacheControl != "" { - headers[HEADER_CACHE_CONTROL_CAMEL] = []string{input.CacheControl} - } + input.prepareContentHeaders(headers) - if input.Expires != "" { - headers[HEADER_EXPIRES_CAMEL] = []string{input.Expires} - } if input.WebsiteRedirectLocation != "" { setHeaders(headers, HEADER_WEBSITE_REDIRECT_LOCATION, []string{input.WebsiteRedirectLocation}, isObs) } @@ -242,7 +245,9 @@ func (input GetObjectInput) trans(isObs bool) (params map[string]string, headers if input.RangeStart >= 0 && input.RangeEnd > input.RangeStart { headers[HEADER_RANGE] = []string{fmt.Sprintf("bytes=%d-%d", input.RangeStart, input.RangeEnd)} } - + if input.AcceptEncoding != "" { + headers[HEADER_ACCEPT_ENCODING] = []string{input.AcceptEncoding} + } if input.IfMatch != "" { headers[HEADER_IF_MATCH] = []string{input.IfMatch} } diff --git a/obs/transfer.go b/obs/transfer.go index 956629e..d1a783c 100644 --- a/obs/transfer.go +++ b/obs/transfer.go @@ -364,12 +364,18 @@ func (obsClient ObsClient) resumeUpload(input *UploadFileInput, extensions []ext return completeOutput, err } -func handleUploadTaskResult(result interface{}, ufc *UploadCheckpoint, partNum int, enableCheckpoint bool, checkpointFilePath string, lock *sync.Mutex) (err error) { +func handleUploadTaskResult(result interface{}, ufc *UploadCheckpoint, partNum int, enableCheckpoint bool, checkpointFilePath string, lock *sync.Mutex, completedBytes *int64, listener ProgressListener) (err error) { if uploadPartOutput, ok := result.(*UploadPartOutput); ok { lock.Lock() defer lock.Unlock() ufc.UploadParts[partNum-1].Etag = uploadPartOutput.ETag ufc.UploadParts[partNum-1].IsCompleted = true + + atomic.AddInt64(completedBytes, ufc.UploadParts[partNum-1].PartSize) + + event := newProgressEvent(TransferDataEvent, *completedBytes, ufc.FileInfo.Size) + publishProgress(listener, event) + if enableCheckpoint { _err := updateCheckpointFile(ufc, checkpointFilePath) if _err != nil { @@ -390,11 +396,21 @@ func (obsClient ObsClient) uploadPartConcurrent(ufc *UploadCheckpoint, checkpoin var errFlag int32 var abort int32 lock := new(sync.Mutex) + + var completedBytes int64 + listener := obsClient.getProgressListener(extensions) + totalBytes := ufc.FileInfo.Size + event := newProgressEvent(TransferStartedEvent, 0, totalBytes) + publishProgress(listener, event) + for _, uploadPart := range ufc.UploadParts { if atomic.LoadInt32(&abort) == 1 { break } if uploadPart.IsCompleted { + atomic.AddInt64(&completedBytes, uploadPart.PartSize) + event := newProgressEvent(TransferDataEvent, completedBytes, ufc.FileInfo.Size) + publishProgress(listener, event) continue } task := uploadPartTask{ @@ -415,7 +431,7 @@ func (obsClient ObsClient) uploadPartConcurrent(ufc *UploadCheckpoint, checkpoin } pool.ExecuteFunc(func() interface{} { result := task.Run() - err := handleUploadTaskResult(result, ufc, task.PartNumber, input.EnableCheckpoint, input.CheckpointFile, lock) + err := handleUploadTaskResult(result, ufc, task.PartNumber, input.EnableCheckpoint, input.CheckpointFile, lock, &completedBytes, listener) if err != nil && atomic.CompareAndSwapInt32(&errFlag, 0, 1) { uploadPartError.Store(err) } @@ -424,8 +440,14 @@ func (obsClient ObsClient) uploadPartConcurrent(ufc *UploadCheckpoint, checkpoin } pool.ShutDown() if err, ok := uploadPartError.Load().(error); ok { + + event := newProgressEvent(TransferFailedEvent, completedBytes, ufc.FileInfo.Size) + publishProgress(listener, event) + return err } + event = newProgressEvent(TransferCompletedEvent, completedBytes, ufc.FileInfo.Size) + publishProgress(listener, event) return nil } @@ -513,9 +535,9 @@ func (task *downloadPartTask) Run() interface{} { var output *GetObjectOutput var err error if len(task.extensions) != 0 { - output, err = task.obsClient.GetObject(getObjectInput, task.extensions...) + output, err = task.obsClient.GetObjectWithoutProgress(getObjectInput, task.extensions...) } else { - output, err = task.obsClient.GetObject(getObjectInput) + output, err = task.obsClient.GetObjectWithoutProgress(getObjectInput) } if err == nil { @@ -807,11 +829,17 @@ func updateDownloadFile(filePath string, rangeStart int64, output *GetObjectOutp return nil } -func handleDownloadTaskResult(result interface{}, dfc *DownloadCheckpoint, partNum int64, enableCheckpoint bool, checkpointFile string, lock *sync.Mutex) (err error) { - if _, ok := result.(*GetObjectOutput); ok { +func handleDownloadTaskResult(result interface{}, dfc *DownloadCheckpoint, partNum int64, enableCheckpoint bool, checkpointFile string, lock *sync.Mutex, completedBytes *int64, listener ProgressListener) (err error) { + if output, ok := result.(*GetObjectOutput); ok { lock.Lock() defer lock.Unlock() dfc.DownloadParts[partNum-1].IsCompleted = true + + atomic.AddInt64(completedBytes, output.ContentLength) + + event := newProgressEvent(TransferDataEvent, *completedBytes, dfc.ObjectInfo.Size) + publishProgress(listener, event) + if enableCheckpoint { _err := updateCheckpointFile(dfc, checkpointFile) if _err != nil { @@ -832,11 +860,21 @@ func (obsClient ObsClient) downloadFileConcurrent(input *DownloadFileInput, dfc var errFlag int32 var abort int32 lock := new(sync.Mutex) + + var completedBytes int64 + listener := obsClient.getProgressListener(extensions) + totalBytes := dfc.ObjectInfo.Size + event := newProgressEvent(TransferStartedEvent, 0, totalBytes) + publishProgress(listener, event) + for _, downloadPart := range dfc.DownloadParts { if atomic.LoadInt32(&abort) == 1 { break } if downloadPart.IsCompleted { + atomic.AddInt64(&completedBytes, downloadPart.RangeEnd-downloadPart.Offset+1) + event := newProgressEvent(TransferDataEvent, completedBytes, dfc.ObjectInfo.Size) + publishProgress(listener, event) continue } task := downloadPartTask{ @@ -858,7 +896,7 @@ func (obsClient ObsClient) downloadFileConcurrent(input *DownloadFileInput, dfc } pool.ExecuteFunc(func() interface{} { result := task.Run() - err := handleDownloadTaskResult(result, dfc, task.partNumber, input.EnableCheckpoint, input.CheckpointFile, lock) + err := handleDownloadTaskResult(result, dfc, task.partNumber, input.EnableCheckpoint, input.CheckpointFile, lock, &completedBytes, listener) if err != nil && atomic.CompareAndSwapInt32(&errFlag, 0, 1) { downloadPartError.Store(err) } @@ -867,8 +905,11 @@ func (obsClient ObsClient) downloadFileConcurrent(input *DownloadFileInput, dfc } pool.ShutDown() if err, ok := downloadPartError.Load().(error); ok { + event := newProgressEvent(TransferFailedEvent, completedBytes, dfc.ObjectInfo.Size) + publishProgress(listener, event) return err } - + event = newProgressEvent(TransferCompletedEvent, completedBytes, dfc.ObjectInfo.Size) + publishProgress(listener, event) return nil } diff --git a/obs/util.go b/obs/util.go index 15683d3..062cc10 100644 --- a/obs/util.go +++ b/obs/util.go @@ -13,6 +13,7 @@ package obs import ( + "bytes" "crypto/hmac" "crypto/md5" "crypto/sha1" @@ -22,8 +23,10 @@ import ( "encoding/json" "encoding/xml" "fmt" + "io" "net/http" "net/url" + "os" "regexp" "strconv" "strings" @@ -68,7 +71,7 @@ func IsHandleCallbackResponse(action string, headers map[string][]string, isObs if isObs == true { headerPrefix = HEADER_PREFIX_OBS } - supportCallbackActions := []string{"PutObject", "PutFile", "CompleteMultipartUpload"} + supportCallbackActions := []string{PUT_OBJECT, PUT_FILE, "CompleteMultipartUpload"} return len(headers[headerPrefix+CALLBACK]) != 0 && IsContain(supportCallbackActions, action) } @@ -581,3 +584,37 @@ func getTemporaryAuthorization(ak, sk, method, bucketName, objectKey, signature return } + +func GetContentType(key string) (string, bool) { + if ct, ok := mimeTypes[strings.ToLower(key[strings.LastIndex(key, ".")+1:])]; ok { + return ct, ok + } + return "", false +} + +func GetReaderLen(reader io.Reader) (int64, error) { + var contentLength int64 + var err error + switch v := reader.(type) { + case *bytes.Buffer: + contentLength = int64(v.Len()) + case *bytes.Reader: + contentLength = int64(v.Len()) + case *strings.Reader: + contentLength = int64(v.Len()) + case *os.File: + fInfo, fError := v.Stat() + if fError != nil { + err = fmt.Errorf("can't get reader content length,%s", fError.Error()) + } else { + contentLength = fInfo.Size() + } + case *io.LimitedReader: + contentLength = int64(v.N) + case *fileReaderWrapper: + contentLength = int64(v.totalCount) + default: + err = fmt.Errorf("can't get reader content length,unkown reader type") + } + return contentLength, err +}