diff --git a/README.MD b/README.MD
index 060ac9b..58ebfab 100644
--- a/README.MD
+++ b/README.MD
@@ -1,16 +1,52 @@
-Version 3.21.12
+Version 3.23.9
New Features:
-1. Added the `obs.WithTrafficLimitHeader` method to limit the bandwidth of a single connection.
+1. Added the obs.WithProgress method to support progress bar callback.
+2. Added fragment expiration time in configuration life cycle management rules.
Documentation & Demo:
-1. Added descriptions about single-connection bandwidth throttling.
+
+-----------------------------------------------------------------------------------
+
+Version 3.23.4
+
+New Features:
+1. Added the obs.WithCustomHeader method when upload.
+2. Added bucket customDomain-related APIs, including SetBucketCustomDomain, GetBucketCustomDomain, and DeleteBucketCustomDomain.
+3. Added bucket mirrorBackToSource-related APIs, including SetBucketMirrorBackToSource, GetBucketMirrorBackToSource, and DeleteBucketMirrorBackToSource.
+
+Documentation & Demo:
+1. Added descriptions about bucket customDomain APIs.
+2. Added descriptions about bucket mirrorBackToSource APIs.
+
+-----------------------------------------------------------------------------------
+Version 3.23.3
+
+New Features:
+1. Added the obs.WithCallbackHeader method when upload.
+
+Documentation & Demo:
+1. Added descriptions about callback when upload.
2. Added descriptions about extended configurations of SDK APIs.
Resolved Issues:
1. Optimized some code.
-----------------------------------------------------------------------------------
+Version 3.21.12
+
+New Features:
+1. Added obs.WithTrafficLimitHeader method to realize single-connection bandwidth throttling.
+
+Documentation & Demo:
+1. Added descriptions about single-connection bandwidth throttling.
+2. Added descriptions about the SDK interface extension configuration.
+
+Resolved Issues:
+1. Optimized part of the code.
+
+-----------------------------------------------------------------------------------
+
Version 3.21.8
New Features:
diff --git a/README_CN.MD b/README_CN.MD
index 90f2f01..8dd6c8e 100644
--- a/README_CN.MD
+++ b/README_CN.MD
@@ -1,3 +1,37 @@
+Version 3.23.9
+
+新特性:
+1. 新增obs.WithProgress方法支持进度条回调。
+2. 新增配置生命周期管理规则中的碎片过期时间。
+
+资料&demo:
+
+-----------------------------------------------------------------------------------
+Version 3.23.4
+
+新特性:
+1. 新增obs.WithCustomHeader方法。
+2. 新增自定义域名相关API,包括SetBucketCustomDomain,GetBucketCustomDomain,DeleteBucketCustomDomain。
+3. 新增镜像回源相关API,包括SetBucketMirrorBackToSource、GetBucketMirrorBackToSource、DeleteBucketMirrorBackToSource。
+
+资料&demo:
+1. 补充自定义域名相关API的描述。
+2. 补充镜像回源相关API的描述。
+
+-----------------------------------------------------------------------------------
+Version 3.23.3
+
+新特性:
+1. 新增obs.WithCallbackHeader方法;
+
+资料&demo:
+1. 补充上传时回调的说明。
+2. 补充SDK API扩展配置说明。
+
+修复问题:
+1. 优化部分代码实现
+
+-----------------------------------------------------------------------------------
Version 3.21.12
新特性:
diff --git a/main/obs_go_sample.go b/main/obs_go_sample.go
index 08265aa..f05e71f 100644
--- a/main/obs_go_sample.go
+++ b/main/obs_go_sample.go
@@ -359,7 +359,7 @@ func setBucketCors() {
var corsRules [2]obs.CorsRule
corsRule0 := obs.CorsRule{}
corsRule0.ID = "rule1"
- corsRule0.AllowedOrigin = []string{"http://www.a.com", "http://www.b.com"}
+ corsRule0.AllowedOrigin = []string{"***"}
corsRule0.AllowedMethod = []string{"GET", "PUT", "POST", "HEAD"}
corsRule0.AllowedHeader = []string{"header1", "header2"}
corsRule0.MaxAgeSeconds = 100
@@ -368,7 +368,7 @@ func setBucketCors() {
corsRule1 := obs.CorsRule{}
corsRule1.ID = "rule2"
- corsRule1.AllowedOrigin = []string{"http://www.c.com", "http://www.d.com"}
+ corsRule1.AllowedOrigin = []string{"***"}
corsRule1.AllowedMethod = []string{"GET", "PUT", "POST", "HEAD"}
corsRule1.AllowedHeader = []string{"header3", "header4"}
corsRule1.MaxAgeSeconds = 50
@@ -580,7 +580,7 @@ func getBucketLoggingConfiguration() {
func setBucketWebsiteConfiguration() {
input := &obs.SetBucketWebsiteConfigurationInput{}
input.Bucket = bucketName
- // input.RedirectAllRequestsTo.HostName = "www.a.com"
+ // input.RedirectAllRequestsTo.HostName = "***"
// input.RedirectAllRequestsTo.Protocol = obs.ProtocolHttp
input.IndexDocument.Suffix = "suffix"
input.ErrorDocument.Key = "key"
@@ -588,7 +588,7 @@ func setBucketWebsiteConfiguration() {
var routingRules [2]obs.RoutingRule
routingRule0 := obs.RoutingRule{}
- routingRule0.Redirect.HostName = "www.a.com"
+ routingRule0.Redirect.HostName = "***"
routingRule0.Redirect.Protocol = obs.ProtocolHttp
routingRule0.Redirect.ReplaceKeyPrefixWith = "prefix"
routingRule0.Redirect.HttpRedirectCode = "304"
@@ -596,7 +596,7 @@ func setBucketWebsiteConfiguration() {
routingRule1 := obs.RoutingRule{}
- routingRule1.Redirect.HostName = "www.b.com"
+ routingRule1.Redirect.HostName = "***"
routingRule1.Redirect.Protocol = obs.ProtocolHttps
routingRule1.Redirect.ReplaceKeyWith = "replaceKey"
routingRule1.Redirect.HttpRedirectCode = "304"
@@ -822,7 +822,7 @@ func deleteBucketMirrorBackToSource() {
func setBucketCustomDomain() {
input := &obs.SetBucketCustomDomainInput{}
input.Bucket = bucketName
- input.CustomDomain = "www.example.com"
+ input.CustomDomain = "***"
output, err := getObsClient().SetBucketCustomDomain(input)
if err == nil {
@@ -860,7 +860,7 @@ func getBucketCustomDomain() {
func deleteBucketCustomdomain() {
input := &obs.DeleteBucketCustomDomainInput{}
input.Bucket = bucketName
- input.CustomDomain = "www.test-go4444.com"
+ input.CustomDomain = "***"
output, err := getObsClient().DeleteBucketCustomDomain(input)
if err == nil {
@@ -1602,6 +1602,126 @@ func renameFolder() {
}
+// 定义进度条监听器。
+type ObsProgressListener struct {
+}
+
+// 定义进度变更事件处理函数。
+func (listener *ObsProgressListener) ProgressChanged(event *obs.ProgressEvent) {
+ switch event.EventType {
+ case obs.TransferStartedEvent:
+ fmt.Printf("Transfer Started, ConsumedBytes: %d, TotalBytes %d.\n",
+ event.ConsumedBytes, event.TotalBytes)
+ case obs.TransferDataEvent:
+ fmt.Printf("\rTransfer Data, ConsumedBytes: %d, TotalBytes %d, %d%%.\n",
+ event.ConsumedBytes, event.TotalBytes, event.ConsumedBytes*100/event.TotalBytes)
+ case obs.TransferCompletedEvent:
+ fmt.Printf("\nTransfer Completed, ConsumedBytes: %d, TotalBytes %d.\n",
+ event.ConsumedBytes, event.TotalBytes)
+ case obs.TransferFailedEvent:
+ fmt.Printf("\nTransfer Failed, ConsumedBytes: %d, TotalBytes %d.\n",
+ event.ConsumedBytes, event.TotalBytes)
+ default:
+ }
+}
+
+func getObjectWithProgress() {
+ input := &obs.GetObjectInput{}
+ input.Bucket = bucketName
+ input.Key = objectKey
+ output, err := getObsClient().GetObject(input, obs.WithProgress(&ObsProgressListener{}))
+ if err == nil {
+ defer output.Body.Close()
+ fmt.Printf("StatusCode:%d, RequestId:%s\n", output.StatusCode, output.RequestId)
+ fmt.Printf("StorageClass:%s, ETag:%s, ContentType:%s, ContentLength:%d, LastModified:%s\n",
+ output.StorageClass, output.ETag, output.ContentType, output.ContentLength, output.LastModified)
+ p := make([]byte, 1024)
+ var readErr error
+ var readCount int
+ for {
+ readCount, readErr = output.Body.Read(p)
+ if readCount > 0 {
+ fmt.Printf("%s", p[:readCount])
+ }
+ if readErr != nil {
+ break
+ }
+ }
+ } else {
+ if obsError, ok := err.(obs.ObsError); ok {
+ fmt.Println(obsError.StatusCode)
+ fmt.Println(obsError.Code)
+ fmt.Println(obsError.Message)
+ } else {
+ fmt.Println(err)
+ }
+ }
+}
+
+func putObjectWithProgress() {
+ input := &obs.PutObjectInput{}
+ input.Bucket = bucketName
+ input.Key = objectKey
+ input.Metadata = map[string]string{"meta": "value"}
+ input.Body = strings.NewReader("Hello OBS")
+ output, err := getObsClient().PutObject(input, obs.WithProgress(&ObsProgressListener{}))
+ if err == nil {
+ fmt.Printf("StatusCode:%d, RequestId:%s\n", output.StatusCode, output.RequestId)
+ fmt.Printf("ETag:%s, StorageClass:%s\n", output.ETag, output.StorageClass)
+ } else {
+ if obsError, ok := err.(obs.ObsError); ok {
+ fmt.Println(obsError.StatusCode)
+ fmt.Println(obsError.Code)
+ fmt.Println(obsError.Message)
+ } else {
+ fmt.Println(err)
+ }
+ }
+}
+
+func putFileWithProgress() {
+ input := &obs.PutFileInput{}
+ input.Bucket = bucketName
+ input.Key = objectKey
+ input.SourceFile = "localfile"
+ output, err := getObsClient().PutFile(input, obs.WithProgress(&ObsProgressListener{}))
+ if err == nil {
+ fmt.Printf("StatusCode:%d, RequestId:%s\n", output.StatusCode, output.RequestId)
+ fmt.Printf("ETag:%s, StorageClass:%s\n", output.ETag, output.StorageClass)
+ } else {
+ if obsError, ok := err.(obs.ObsError); ok {
+ fmt.Println(obsError.StatusCode)
+ fmt.Println(obsError.Code)
+ fmt.Println(obsError.Message)
+ } else {
+ fmt.Println(err)
+ }
+ }
+}
+
+func appendObjectWithProgress() {
+ input := &obs.AppendObjectInput{}
+ input.Bucket = bucketName
+ input.Key = objectKey
+ input.Position = 9
+ input.Body = strings.NewReader("Hello OBS")
+ output, err := getObsClient().AppendObject(input, obs.WithProgress(&ObsProgressListener{}))
+ if err == nil {
+ fmt.Printf("Append object(%s) under the bucket(%s) successful!\n", input.Key, input.Bucket)
+ fmt.Printf("ETag:%s, NextAppendPosition:%d\n", output.ETag, output.NextAppendPosition)
+ return
+ }
+ fmt.Printf("Append objects under the bucket(%s) fail!\n", input.Bucket)
+ if obsError, ok := err.(obs.ObsError); ok {
+ fmt.Println("An ObsError was found, which means your request sent to OBS was rejected with an error response.")
+ fmt.Println(obsError.Error())
+ } else {
+ fmt.Println("An Exception was found, which means the client encountered an internal problem when attempting to communicate with OBS, for example, the client was unable to access the network.")
+ fmt.Println(err)
+ }
+
+}
+
func runExamples() {
examples.RunBucketOperationsSample()
// examples.RunObjectOperationsSample()
@@ -1696,9 +1816,11 @@ func main() {
// completeMultipartUpload()
// abortMultipartUpload()
// putObject()
+ // putObjectWithProgress()
// putFile()
// getObjectMetadata()
// getObject()
+ // getObjectWithProgress()
// putObjectWithCallback()
// deleteBucket()
}
diff --git a/obs/auth.go b/obs/auth.go
index 00c74cf..8a964d5 100644
--- a/obs/auth.go
+++ b/obs/auth.go
@@ -20,7 +20,16 @@ import (
"time"
)
-func (obsClient ObsClient) doAuthTemporary(method, bucketName, objectKey string, params map[string]string,
+func setURLWithPolicy(bucketName, canonicalizedUrl string) string {
+ if strings.HasPrefix(canonicalizedUrl, "/"+bucketName+"/") {
+ canonicalizedUrl = canonicalizedUrl[len("/"+bucketName+"/"):]
+ } else if strings.HasPrefix(canonicalizedUrl, "/"+bucketName) {
+ canonicalizedUrl = canonicalizedUrl[len("/"+bucketName):]
+ }
+ return canonicalizedUrl
+}
+
+func (obsClient ObsClient) doAuthTemporary(method, bucketName, objectKey string, policy string, params map[string]string,
headers map[string][]string, expires int64) (requestURL string, err error) {
sh := obsClient.getSecurity()
isAkSkEmpty := sh.ak == "" || sh.sk == ""
@@ -31,6 +40,11 @@ func (obsClient ObsClient) doAuthTemporary(method, bucketName, objectKey string,
params[HEADER_STS_TOKEN_AMZ] = sh.securityToken
}
}
+
+ if policy != "" {
+ objectKey = ""
+ }
+
requestURL, canonicalizedURL := obsClient.conf.formatUrls(bucketName, objectKey, params, true)
parsedRequestURL, err := url.Parse(requestURL)
if err != nil {
@@ -93,7 +107,13 @@ func (obsClient ObsClient) doAuthTemporary(method, bucketName, objectKey string,
return "", parseDateErr
}
expires += date.Unix()
- headers[HEADER_DATE_CAMEL] = []string{Int64ToString(expires)}
+ if policy == "" {
+ headers[HEADER_DATE_CAMEL] = []string{Int64ToString(expires)}
+ } else {
+ policy = Base64Encode([]byte(policy))
+ headers[HEADER_DATE_CAMEL] = []string{policy}
+ canonicalizedURL = setURLWithPolicy(bucketName, canonicalizedURL)
+ }
stringToSign := getV2StringToSign(method, canonicalizedURL, headers, obsClient.conf.signature == SignatureObs)
signature := UrlEncode(Base64Encode(HmacSha1([]byte(sh.sk), []byte(stringToSign))), false)
@@ -107,7 +127,14 @@ func (obsClient ObsClient) doAuthTemporary(method, bucketName, objectKey string,
if obsClient.conf.signature != SignatureObs {
requestURL += "AWS"
}
- requestURL += fmt.Sprintf("AccessKeyId=%s&Expires=%d&Signature=%s", UrlEncode(sh.ak, false), expires, signature)
+ if policy == "" {
+ requestURL += fmt.Sprintf("AccessKeyId=%s&Expires=%d&Signature=%s", UrlEncode(sh.ak, false),
+ expires, signature)
+ return
+
+ }
+ requestURL += fmt.Sprintf("AccessKeyId=%s&Policy=%s&Signature=%s", UrlEncode(sh.ak, false),
+ UrlEncode(policy, false), signature)
}
}
diff --git a/obs/authV4.go b/obs/authV4.go
index 70abe96..258b342 100644
--- a/obs/authV4.go
+++ b/obs/authV4.go
@@ -76,7 +76,6 @@ func getV4StringToSign(method, canonicalizedURL, queryURL, scope, longDate, payl
_stringToSign := strings.Join(stringToSign, "")
- doLog(LEVEL_DEBUG, "The v4 auth stringToSign:\n%s", _stringToSign)
return _stringToSign
}
diff --git a/obs/callback.go b/obs/callback.go
index b2e41b7..1358e78 100644
--- a/obs/callback.go
+++ b/obs/callback.go
@@ -10,7 +10,6 @@
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
-//nolint:structcheck, unused
package obs
import (
diff --git a/obs/client_object.go b/obs/client_object.go
index fecdb60..7835e98 100644
--- a/obs/client_object.go
+++ b/obs/client_object.go
@@ -223,12 +223,32 @@ func (obsClient ObsClient) GetObject(input *GetObjectInput, extensions ...extens
return nil, errors.New("GetObjectInput is nil")
}
output = &GetObjectOutput{}
- err = obsClient.doActionWithBucketAndKey("GetObject", HTTP_GET, input.Bucket, input.Key, input, output, extensions)
+ err = obsClient.doActionWithBucketAndKeyWithProgress(GET_OBJECT, HTTP_GET, input.Bucket, input.Key, input, output, extensions, nil)
if err != nil {
output = nil
- } else {
- ParseGetObjectOutput(output)
+ return
+ }
+
+ ParseGetObjectOutput(output)
+ listener := obsClient.getProgressListener(extensions)
+ if listener != nil {
+ output.Body = TeeReader(output.Body, output.ContentLength, listener, nil)
+ }
+ return
+}
+
+func (obsClient ObsClient) GetObjectWithoutProgress(input *GetObjectInput, extensions ...extensionOptions) (output *GetObjectOutput, err error) {
+ if input == nil {
+ return nil, errors.New("GetObjectInput is nil")
}
+ output = &GetObjectOutput{}
+ err = obsClient.doActionWithBucketAndKeyWithProgress(GET_OBJECT, HTTP_GET, input.Bucket, input.Key, input, output, extensions, nil)
+ if err != nil {
+ output = nil
+ return
+ }
+
+ ParseGetObjectOutput(output)
return
}
@@ -239,7 +259,7 @@ func (obsClient ObsClient) PutObject(input *PutObjectInput, extensions ...extens
}
if input.ContentType == "" && input.Key != "" {
- if contentType, ok := mimeTypes[strings.ToLower(input.Key[strings.LastIndex(input.Key, ".")+1:])]; ok {
+ if contentType, ok := GetContentType(input.Key); ok {
input.ContentType = contentType
}
}
@@ -253,24 +273,27 @@ func (obsClient ObsClient) PutObject(input *PutObjectInput, extensions ...extens
input.Body = &readerWrapper{reader: input.Body, totalCount: input.ContentLength}
}
}
+
+ listener := obsClient.getProgressListener(extensions)
if repeatable {
- err = obsClient.doActionWithBucketAndKey("PutObject", HTTP_PUT, input.Bucket, input.Key, input, output, extensions)
+ err = obsClient.doActionWithBucketAndKeyWithProgress(PUT_OBJECT, HTTP_PUT, input.Bucket, input.Key, input, output, extensions, listener)
} else {
- err = obsClient.doActionWithBucketAndKeyUnRepeatable("PutObject", HTTP_PUT, input.Bucket, input.Key, input, output, extensions)
+ err = obsClient.doActionWithBucketAndKeyUnRepeatableWithProgress(PUT_OBJECT, HTTP_PUT, input.Bucket, input.Key, input, output, extensions, listener)
}
if err != nil {
output = nil
- } else {
- ParsePutObjectOutput(output)
+ return
}
+ ParsePutObjectOutput(output)
+ output.ObjectUrl = fmt.Sprintf("%s/%s/%s", obsClient.conf.endpoint, input.Bucket, input.Key)
return
}
func (obsClient ObsClient) getContentType(input *PutObjectInput, sourceFile string) (contentType string) {
- if contentType, ok := mimeTypes[strings.ToLower(input.Key[strings.LastIndex(input.Key, ".")+1:])]; ok {
+ if contentType, ok := GetContentType(input.Key); ok {
return contentType
}
- if contentType, ok := mimeTypes[strings.ToLower(sourceFile[strings.LastIndex(sourceFile, ".")+1:])]; ok {
+ if contentType, ok := GetContentType(sourceFile); ok {
return contentType
}
return
@@ -349,14 +372,17 @@ func (obsClient ObsClient) PutFile(input *PutFileInput, extensions ...extensionO
if obsClient.isGetContentType(_input) {
_input.ContentType = obsClient.getContentType(_input, sourceFile)
}
-
+ listener := obsClient.getProgressListener(extensions)
output = &PutObjectOutput{}
- err = obsClient.doActionWithBucketAndKey("PutFile", HTTP_PUT, _input.Bucket, _input.Key, _input, output, extensions)
+ err = obsClient.doActionWithBucketAndKeyWithProgress(PUT_FILE, HTTP_PUT, _input.Bucket, _input.Key, _input, output, extensions, listener)
+
if err != nil {
output = nil
- } else {
- ParsePutObjectOutput(output)
+ return
}
+
+ ParsePutObjectOutput(output)
+ output.ObjectUrl = fmt.Sprintf("%s/%s/%s", obsClient.conf.endpoint, input.Bucket, input.Key)
return
}
@@ -405,17 +431,16 @@ func (obsClient ObsClient) AppendObject(input *AppendObjectInput, extensions ...
input.Body = &readerWrapper{reader: input.Body, totalCount: input.ContentLength}
}
}
+ listener := obsClient.getProgressListener(extensions)
+
if repeatable {
- err = obsClient.doActionWithBucketAndKey("AppendObject", HTTP_POST, input.Bucket, input.Key, input, output, extensions)
+ err = obsClient.doActionWithBucketAndKeyWithProgress(APPEND_OBJECT, HTTP_POST, input.Bucket, input.Key, input, output, extensions, listener)
} else {
- err = obsClient.doActionWithBucketAndKeyUnRepeatable("AppendObject", HTTP_POST, input.Bucket, input.Key, input, output, extensions)
+ err = obsClient.doActionWithBucketAndKeyUnRepeatableWithProgress(APPEND_OBJECT, HTTP_POST, input.Bucket, input.Key, input, output, extensions, listener)
}
- if err != nil {
+
+ if err != nil || ParseAppendObjectOutput(output) != nil {
output = nil
- } else {
- if err = ParseAppendObjectOutput(output); err != nil {
- output = nil
- }
}
return
}
diff --git a/obs/conf.go b/obs/conf.go
index e4e27c9..74a3ad8 100644
--- a/obs/conf.go
+++ b/obs/conf.go
@@ -21,10 +21,14 @@ import (
"net"
"net/http"
"net/url"
+ "os"
"sort"
"strconv"
"strings"
+ "sync"
"time"
+
+ "golang.org/x/net/http/httpproxy"
)
type urlHolder struct {
@@ -49,6 +53,8 @@ type config struct {
finalTimeout int
maxRetryCount int
proxyURL string
+ noProxyURL string
+ proxyFromEnv bool
maxConnsPerHost int
pemCerts []byte
transport *http.Transport
@@ -58,6 +64,10 @@ type config struct {
maxRedirectCount int
userAgent string
enableCompression bool
+ progressListener ProgressListener
+
+ customProxyOnce sync.Once
+ customProxyFuncValue func(*url.URL) (*url.URL, error)
}
func (conf config) String() string {
@@ -109,6 +119,20 @@ func WithProxyUrl(proxyURL string) configurer {
}
}
+// WithNoProxyUrl is a configurer for ObsClient to set HTTP no_proxy.
+func WithNoProxyUrl(noProxyURL string) configurer {
+ return func(conf *config) {
+ conf.noProxyURL = noProxyURL
+ }
+}
+
+// WithProxyFromEnv is a configurer for ObsClient to get proxy from evironment.
+func WithProxyFromEnv(proxyFromEnv bool) configurer {
+ return func(conf *config) {
+ conf.proxyFromEnv = proxyFromEnv
+ }
+}
+
// WithMaxConnections is a configurer for ObsClient to set the maximum number of idle HTTP connections.
func WithMaxConnections(maxConnsPerHost int) configurer {
return func(conf *config) {
@@ -342,13 +366,10 @@ func (conf *config) getTransport() error {
ResponseHeaderTimeout: time.Second * time.Duration(conf.headerTimeout),
IdleConnTimeout: time.Second * time.Duration(conf.idleConnTimeout),
}
-
if conf.proxyURL != "" {
- proxyURL, err := url.Parse(conf.proxyURL)
- if err != nil {
- return err
- }
- conf.transport.Proxy = http.ProxyURL(proxyURL)
+ conf.transport.Proxy = conf.customProxyFromEnvironment
+ } else if conf.proxyFromEnv {
+ conf.transport.Proxy = http.ProxyFromEnvironment
}
tlsConfig := &tls.Config{InsecureSkipVerify: !conf.sslVerify}
@@ -365,6 +386,24 @@ func (conf *config) getTransport() error {
return nil
}
+func (conf *config) customProxyFromEnvironment(req *http.Request) (*url.URL, error) {
+ url, err := conf.customProxyFunc()(req.URL)
+ return url, err
+}
+
+func (conf *config) customProxyFunc() func(*url.URL) (*url.URL, error) {
+ conf.customProxyOnce.Do(func() {
+ customhttpproxy := &httpproxy.Config{
+ HTTPProxy: conf.proxyURL,
+ HTTPSProxy: conf.proxyURL,
+ NoProxy: conf.noProxyURL,
+ CGI: os.Getenv("REQUEST_METHOD") != "",
+ }
+ conf.customProxyFuncValue = customhttpproxy.ProxyFunc()
+ })
+ return conf.customProxyFuncValue
+}
+
func checkRedirectFunc(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
}
@@ -503,3 +542,20 @@ func getQueryURL(key, value string) string {
queryURL += value
return queryURL
}
+
+var once sync.Once
+
+func (obsClient ObsClient) GetClientConfigure(extensions []extensionOptions) *config {
+ once.Do(func() {
+ for _, extension := range extensions {
+ if configure, ok := extension.(configurer); ok {
+ configure(obsClient.conf)
+ }
+ }
+ })
+ return obsClient.conf
+}
+
+func (obsClient ObsClient) getProgressListener(extensions []extensionOptions) ProgressListener {
+ return obsClient.GetClientConfigure(extensions).progressListener
+}
diff --git a/obs/const.go b/obs/const.go
index b04db63..acb1e07 100644
--- a/obs/const.go
+++ b/obs/const.go
@@ -49,6 +49,8 @@ const (
HEADER_GRANT_READ_DELIVERED_OBS = "grant-read-delivered"
HEADER_GRANT_FULL_CONTROL_DELIVERED_OBS = "grant-full-control-delivered"
HEADER_REQUEST_ID = "request-id"
+ HEADER_ERROR_CODE = "error-code"
+ HEADER_ERROR_MESSAGE = "error-message"
HEADER_BUCKET_REGION = "bucket-region"
HEADER_ACCESS_CONRTOL_ALLOW_ORIGIN = "access-control-allow-origin"
HEADER_ACCESS_CONRTOL_ALLOW_HEADERS = "access-control-allow-headers"
@@ -130,6 +132,7 @@ const (
HEADER_CONTENT_ENCODING_CAMEL = "Content-Encoding"
HEADER_CONTENT_LANGUAGE_CAMEL = "Content-Language"
HEADER_EXPIRES_CAMEL = "Expires"
+ HEADER_ACCEPT_ENCODING = "Accept-Encoding"
PARAM_VERSION_ID = "versionId"
PARAM_RESPONSE_CONTENT_TYPE = "response-content-type"
@@ -157,7 +160,6 @@ const (
DEFAULT_MAX_RETRY_COUNT = 3
DEFAULT_MAX_REDIRECT_COUNT = 3
DEFAULT_MAX_CONN_PER_HOST = 1000
- EMPTY_CONTENT_SHA256 = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
UNSIGNED_PAYLOAD = "UNSIGNED-PAYLOAD"
LONG_DATE_FORMAT = "20060102T150405Z"
SHORT_DATE_FORMAT = "20060102"
@@ -195,6 +197,11 @@ const (
MIN_PART_SIZE = 100 * 1024
DEFAULT_PART_SIZE = 9 * 1024 * 1024
MAX_PART_NUM = 10000
+
+ GET_OBJECT = "GetObject"
+ PUT_OBJECT = "PutObject"
+ PUT_FILE = "PutFile"
+ APPEND_OBJECT = "AppendObject"
)
var (
@@ -224,6 +231,16 @@ var (
"if-none-match": true,
"last-modified": true,
"content-range": true,
+ "accept-encoding": true,
+ }
+
+ allowedLogResponseHTTPHeaderNames = map[string]bool{
+ "content-type": true,
+ "etag": true,
+ "connection": true,
+ "content-length": true,
+ "date": true,
+ "server": true,
}
allowedResourceParameterNames = map[string]bool{
diff --git a/obs/convert.go b/obs/convert.go
index f08dc42..97bedf5 100644
--- a/obs/convert.go
+++ b/obs/convert.go
@@ -348,11 +348,24 @@ func convertTransitionsToXML(transitions []Transition, isObs bool) string {
return ""
}
+func converLifeCycleFilterToXML(filter LifecycleFilter) string {
+ if filter.Prefix == "" && len(filter.Tags) == 0 {
+ return ""
+ }
+ data, err := TransToXml(filter)
+ if err != nil {
+ return ""
+ }
+ return string(data)
+}
+
func convertExpirationToXML(expiration Expiration) string {
if expiration.Days > 0 {
return fmt.Sprintf("%d", expiration.Days)
} else if !expiration.Date.IsZero() {
return fmt.Sprintf("%s", expiration.Date.UTC().Format(ISO8601_MIDNIGHT_DATE_FORMAT))
+ } else if expiration.ExpiredObjectDeleteMarker != "" {
+ return fmt.Sprintf("%s", expiration.ExpiredObjectDeleteMarker)
}
return ""
}
@@ -385,6 +398,13 @@ func convertNoncurrentVersionExpirationToXML(noncurrentVersionExpiration Noncurr
return ""
}
+func convertAbortIncompleteMultipartUploadToXML(abortIncompleteMultipartUpload AbortIncompleteMultipartUpload) string {
+ if abortIncompleteMultipartUpload.DaysAfterInitiation > 0 {
+ return fmt.Sprintf("%d", abortIncompleteMultipartUpload.DaysAfterInitiation)
+ }
+ return ""
+}
+
// ConvertLifecyleConfigurationToXml converts BucketLifecyleConfiguration value to XML data and returns it
func ConvertLifecyleConfigurationToXml(input BucketLifecyleConfiguration, returnMd5 bool, isObs bool) (data string, md5 string) {
xml := make([]string, 0, 2+len(input.LifecycleRules)*9)
@@ -396,8 +416,13 @@ func ConvertLifecyleConfigurationToXml(input BucketLifecyleConfiguration, return
xml = append(xml, fmt.Sprintf("%s", lifecyleRuleID))
}
lifecyleRulePrefix := XmlTranscoding(lifecyleRule.Prefix)
- xml = append(xml, fmt.Sprintf("%s", lifecyleRulePrefix))
+ if lifecyleRulePrefix != "" {
+ xml = append(xml, fmt.Sprintf("%s", lifecyleRulePrefix))
+ }
xml = append(xml, fmt.Sprintf("%s", lifecyleRule.Status))
+ if ret := converLifeCycleFilterToXML(lifecyleRule.Filter); ret != "" {
+ xml = append(xml, ret)
+ }
if ret := convertTransitionsToXML(lifecyleRule.Transitions, isObs); ret != "" {
xml = append(xml, ret)
}
@@ -410,6 +435,9 @@ func ConvertLifecyleConfigurationToXml(input BucketLifecyleConfiguration, return
if ret := convertNoncurrentVersionExpirationToXML(lifecyleRule.NoncurrentVersionExpiration); ret != "" {
xml = append(xml, ret)
}
+ if ret := convertAbortIncompleteMultipartUploadToXML(lifecyleRule.AbortIncompleteMultipartUpload); ret != "" {
+ xml = append(xml, ret)
+ }
xml = append(xml, "")
}
xml = append(xml, "")
@@ -749,6 +777,17 @@ func ParseCopyPartOutput(output *CopyPartOutput) {
output.SseHeader = parseSseHeader(output.ResponseHeaders)
}
+// ParseStringToAvailableZoneType converts string value to AvailableZoneType value and returns it
+func ParseStringToAvailableZoneType(value string) (ret AvailableZoneType) {
+ switch value {
+ case "3az":
+ ret = AvailableZoneMultiAz
+ default:
+ ret = ""
+ }
+ return
+}
+
// ParseGetBucketMetadataOutput sets GetBucketMetadataOutput field values with response headers
func ParseGetBucketMetadataOutput(output *GetBucketMetadataOutput) {
output.AllowOrigin, output.AllowHeader, output.AllowMethod, output.ExposeHeader, output.MaxAgeSeconds = parseCorsHeader(output.BaseModel)
@@ -769,7 +808,7 @@ func ParseGetBucketMetadataOutput(output *GetBucketMetadataOutput) {
output.Epid = ret[0]
}
if ret, ok := output.ResponseHeaders[HEADER_AZ_REDUNDANCY]; ok {
- output.AZRedundancy = ret[0]
+ output.AZRedundancy = ParseStringToAvailableZoneType(ret[0])
}
if ret, ok := output.ResponseHeaders[HEADER_BUCKET_REDUNDANCY]; ok {
output.BucketRedundancy = parseStringToBucketRedundancy(ret[0])
@@ -866,9 +905,6 @@ func ParseGetObjectOutput(output *GetObjectOutput) {
func ConvertRequestToIoReaderV2(req interface{}) (io.Reader, string, error) {
data, err := TransToXml(req)
if err == nil {
- if isDebugLogEnabled() {
- doLog(LEVEL_DEBUG, "Do http request with data: %s", string(data))
- }
return bytes.NewReader(data), Base64Md5(data), nil
}
return nil, "", err
@@ -878,9 +914,6 @@ func ConvertRequestToIoReaderV2(req interface{}) (io.Reader, string, error) {
func ConvertRequestToIoReader(req interface{}) (io.Reader, error) {
body, err := TransToXml(req)
if err == nil {
- if isDebugLogEnabled() {
- doLog(LEVEL_DEBUG, "Do http request with data: %s", string(body))
- }
return bytes.NewReader(body), nil
}
return nil, err
@@ -965,6 +998,13 @@ func ParseResponseToObsError(resp *http.Response, isObs bool) error {
doLog(LEVEL_WARN, "Parse response to BaseModel with error: %v", respError)
}
obsError.Status = resp.Status
+ responseHeaders := cleanHeaderPrefix(resp.Header)
+ if values, ok := responseHeaders[HEADER_ERROR_MESSAGE]; ok {
+ obsError.Message = values[0]
+ }
+ if values, ok := responseHeaders[HEADER_ERROR_CODE]; ok {
+ obsError.Code = values[0]
+ }
return obsError
}
diff --git a/obs/extension.go b/obs/extension.go
index aca9535..baa132c 100644
--- a/obs/extension.go
+++ b/obs/extension.go
@@ -21,6 +21,11 @@ import (
type extensionOptions interface{}
type extensionHeaders func(headers map[string][]string, isObs bool) error
+func WithProgress(progressListener ProgressListener) configurer {
+ return func(conf *config) {
+ conf.progressListener = progressListener
+ }
+}
func setHeaderPrefix(key string, value string) extensionHeaders {
return func(headers map[string][]string, isObs bool) error {
if strings.TrimSpace(value) == "" {
diff --git a/obs/http.go b/obs/http.go
index 2ca58dc..58c8010 100644
--- a/obs/http.go
+++ b/obs/http.go
@@ -17,89 +17,119 @@ import (
"errors"
"fmt"
"io"
+ "io/ioutil"
"math/rand"
"net"
"net/http"
"net/url"
"os"
+ "strconv"
"strings"
"time"
)
func prepareHeaders(headers map[string][]string, meta bool, isObs bool) map[string][]string {
_headers := make(map[string][]string, len(headers))
- if headers != nil {
- for key, value := range headers {
- key = strings.TrimSpace(key)
- if key == "" {
+ for key, value := range headers {
+ key = strings.TrimSpace(key)
+ if key == "" {
+ continue
+ }
+ _key := strings.ToLower(key)
+ if _, ok := allowedRequestHTTPHeaderMetadataNames[_key]; !ok && !strings.HasPrefix(key, HEADER_PREFIX) && !strings.HasPrefix(key, HEADER_PREFIX_OBS) {
+ if !meta {
continue
}
- _key := strings.ToLower(key)
- if _, ok := allowedRequestHTTPHeaderMetadataNames[_key]; !ok && !strings.HasPrefix(key, HEADER_PREFIX) && !strings.HasPrefix(key, HEADER_PREFIX_OBS) {
- if !meta {
- continue
- }
- if !isObs {
- _key = HEADER_PREFIX_META + _key
- } else {
- _key = HEADER_PREFIX_META_OBS + _key
- }
+ if !isObs {
+ _key = HEADER_PREFIX_META + _key
} else {
- _key = key
+ _key = HEADER_PREFIX_META_OBS + _key
}
- _headers[_key] = value
+ } else {
+ _key = key
}
+ _headers[_key] = value
}
return _headers
}
+func (obsClient ObsClient) checkParamsWithBucketName(bucketName string) bool {
+ return strings.TrimSpace(bucketName) == "" && !obsClient.conf.cname
+}
+
+func (obsClient ObsClient) checkParamsWithObjectKey(objectKey string) bool {
+ return strings.TrimSpace(objectKey) == ""
+}
+
func (obsClient ObsClient) doActionWithoutBucket(action, method string, input ISerializable, output IBaseModel, extensions []extensionOptions) error {
- return obsClient.doAction(action, method, "", "", input, output, true, true, extensions)
+ return obsClient.doAction(action, method, "", "", input, output, true, true, extensions, nil)
}
func (obsClient ObsClient) doActionWithBucketV2(action, method, bucketName string, input ISerializable, output IBaseModel, extensions []extensionOptions) error {
- if strings.TrimSpace(bucketName) == "" && !obsClient.conf.cname {
+ if obsClient.checkParamsWithBucketName(bucketName) {
return errors.New("Bucket is empty")
}
- return obsClient.doAction(action, method, bucketName, "", input, output, false, true, extensions)
+ return obsClient.doAction(action, method, bucketName, "", input, output, false, true, extensions, nil)
}
func (obsClient ObsClient) doActionWithBucket(action, method, bucketName string, input ISerializable, output IBaseModel, extensions []extensionOptions) error {
- if strings.TrimSpace(bucketName) == "" && !obsClient.conf.cname {
+ if obsClient.checkParamsWithBucketName(bucketName) {
return errors.New("Bucket is empty")
}
- return obsClient.doAction(action, method, bucketName, "", input, output, true, true, extensions)
+ return obsClient.doAction(action, method, bucketName, "", input, output, true, true, extensions, nil)
}
func (obsClient ObsClient) doActionWithBucketAndKey(action, method, bucketName, objectKey string, input ISerializable, output IBaseModel, extensions []extensionOptions) error {
- return obsClient._doActionWithBucketAndKey(action, method, bucketName, objectKey, input, output, true, extensions)
+ if obsClient.checkParamsWithBucketName(bucketName) {
+ return errors.New("Bucket is empty")
+ }
+ if obsClient.checkParamsWithObjectKey(objectKey) {
+ return errors.New("Key is empty")
+ }
+ return obsClient.doAction(action, method, bucketName, objectKey, input, output, true, true, extensions, nil)
+}
+
+func (obsClient ObsClient) doActionWithBucketAndKeyWithProgress(action, method, bucketName, objectKey string, input ISerializable, output IBaseModel, extensions []extensionOptions, listener ProgressListener) error {
+ if obsClient.checkParamsWithBucketName(bucketName) {
+ return errors.New("Bucket is empty")
+ }
+ if obsClient.checkParamsWithObjectKey(objectKey) {
+ return errors.New("Key is empty")
+ }
+ return obsClient.doAction(action, method, bucketName, objectKey, input, output, true, true, extensions, listener)
}
func (obsClient ObsClient) doActionWithBucketAndKeyV2(action, method, bucketName, objectKey string, input ISerializable, output IBaseModel, extensions []extensionOptions) error {
- if strings.TrimSpace(bucketName) == "" && !obsClient.conf.cname {
+ if obsClient.checkParamsWithBucketName(bucketName) {
return errors.New("Bucket is empty")
}
- if strings.TrimSpace(objectKey) == "" {
+ if obsClient.checkParamsWithObjectKey(objectKey) {
return errors.New("Key is empty")
}
- return obsClient.doAction(action, method, bucketName, objectKey, input, output, false, true, extensions)
+ return obsClient.doAction(action, method, bucketName, objectKey, input, output, false, true, extensions, nil)
}
func (obsClient ObsClient) doActionWithBucketAndKeyUnRepeatable(action, method, bucketName, objectKey string, input ISerializable, output IBaseModel, extensions []extensionOptions) error {
- return obsClient._doActionWithBucketAndKey(action, method, bucketName, objectKey, input, output, false, extensions)
+ if obsClient.checkParamsWithBucketName(bucketName) {
+ return errors.New("Bucket is empty")
+ }
+ if obsClient.checkParamsWithObjectKey(objectKey) {
+ return errors.New("Key is empty")
+ }
+ return obsClient.doAction(action, method, bucketName, objectKey, input, output, true, false, extensions, nil)
}
-func (obsClient ObsClient) _doActionWithBucketAndKey(action, method, bucketName, objectKey string, input ISerializable, output IBaseModel, repeatable bool, extensions []extensionOptions) error {
- if strings.TrimSpace(bucketName) == "" && !obsClient.conf.cname {
+func (obsClient ObsClient) doActionWithBucketAndKeyUnRepeatableWithProgress(action, method, bucketName, objectKey string, input ISerializable, output IBaseModel, extensions []extensionOptions, listener ProgressListener) error {
+ if obsClient.checkParamsWithBucketName(bucketName) {
return errors.New("Bucket is empty")
}
- if strings.TrimSpace(objectKey) == "" {
+ if obsClient.checkParamsWithObjectKey(objectKey) {
return errors.New("Key is empty")
}
- return obsClient.doAction(action, method, bucketName, objectKey, input, output, true, repeatable, extensions)
+ return obsClient.doAction(action, method, bucketName, objectKey, input, output, true, false, extensions, listener)
}
-func (obsClient ObsClient) doAction(action, method, bucketName, objectKey string, input ISerializable, output IBaseModel, xmlResult bool, repeatable bool, extensions []extensionOptions) error {
+func (obsClient ObsClient) doAction(action, method, bucketName, objectKey string, input ISerializable, output IBaseModel, xmlResult bool, repeatable bool, extensions []extensionOptions, listener ProgressListener) error {
var resp *http.Response
var respError error
@@ -130,22 +160,8 @@ func (obsClient ObsClient) doAction(action, method, bucketName, objectKey string
}
}
- switch method {
- case HTTP_GET:
- resp, respError = obsClient.doHTTPGet(bucketName, objectKey, params, headers, data, repeatable)
- case HTTP_POST:
- resp, respError = obsClient.doHTTPPost(bucketName, objectKey, params, headers, data, repeatable)
- case HTTP_PUT:
- resp, respError = obsClient.doHTTPPut(bucketName, objectKey, params, headers, data, repeatable)
- case HTTP_DELETE:
- resp, respError = obsClient.doHTTPDelete(bucketName, objectKey, params, headers, data, repeatable)
- case HTTP_HEAD:
- resp, respError = obsClient.doHTTPHead(bucketName, objectKey, params, headers, data, repeatable)
- case HTTP_OPTIONS:
- resp, respError = obsClient.doHTTPOptions(bucketName, objectKey, params, headers, data, repeatable)
- default:
- respError = errors.New("Unexpect http method error")
- }
+ resp, respError = obsClient.doHTTPRequest(method, bucketName, objectKey, params, headers, data, repeatable, listener)
+
if respError == nil && output != nil {
respError = HandleHttpResponse(action, headers, output, resp, xmlResult, isObs)
} else {
@@ -159,34 +175,9 @@ func (obsClient ObsClient) doAction(action, method, bucketName, objectKey string
return respError
}
-func (obsClient ObsClient) doHTTPGet(bucketName, objectKey string, params map[string]string,
- headers map[string][]string, data interface{}, repeatable bool) (*http.Response, error) {
- return obsClient.doHTTP(HTTP_GET, bucketName, objectKey, params, prepareHeaders(headers, false, obsClient.conf.signature == SignatureObs), data, repeatable)
-}
-
-func (obsClient ObsClient) doHTTPHead(bucketName, objectKey string, params map[string]string,
- headers map[string][]string, data interface{}, repeatable bool) (*http.Response, error) {
- return obsClient.doHTTP(HTTP_HEAD, bucketName, objectKey, params, prepareHeaders(headers, false, obsClient.conf.signature == SignatureObs), data, repeatable)
-}
-
-func (obsClient ObsClient) doHTTPOptions(bucketName, objectKey string, params map[string]string,
- headers map[string][]string, data interface{}, repeatable bool) (*http.Response, error) {
- return obsClient.doHTTP(HTTP_OPTIONS, bucketName, objectKey, params, prepareHeaders(headers, false, obsClient.conf.signature == SignatureObs), data, repeatable)
-}
-
-func (obsClient ObsClient) doHTTPDelete(bucketName, objectKey string, params map[string]string,
- headers map[string][]string, data interface{}, repeatable bool) (*http.Response, error) {
- return obsClient.doHTTP(HTTP_DELETE, bucketName, objectKey, params, prepareHeaders(headers, false, obsClient.conf.signature == SignatureObs), data, repeatable)
-}
-
-func (obsClient ObsClient) doHTTPPut(bucketName, objectKey string, params map[string]string,
- headers map[string][]string, data interface{}, repeatable bool) (*http.Response, error) {
- return obsClient.doHTTP(HTTP_PUT, bucketName, objectKey, params, prepareHeaders(headers, true, obsClient.conf.signature == SignatureObs), data, repeatable)
-}
-
-func (obsClient ObsClient) doHTTPPost(bucketName, objectKey string, params map[string]string,
- headers map[string][]string, data interface{}, repeatable bool) (*http.Response, error) {
- return obsClient.doHTTP(HTTP_POST, bucketName, objectKey, params, prepareHeaders(headers, true, obsClient.conf.signature == SignatureObs), data, repeatable)
+func (obsClient ObsClient) doHTTPRequest(method, bucketName, objectKey string, params map[string]string,
+ headers map[string][]string, data interface{}, repeatable bool, listener ProgressListener) (*http.Response, error) {
+ return obsClient.doHTTP(method, bucketName, objectKey, params, prepareHeaders(headers, false, obsClient.conf.signature == SignatureObs), data, repeatable, listener)
}
func prepareAgentHeader(clientUserAgent string) string {
@@ -203,7 +194,7 @@ func (obsClient ObsClient) getSignedURLResponse(action string, output IBaseModel
respError = err
resp = nil
} else {
- doLog(LEVEL_DEBUG, "Response headers: %v", resp.Header)
+ doLog(LEVEL_DEBUG, "Response headers: %s", logResponseHeader(resp.Header))
if resp.StatusCode >= 300 {
respError = ParseResponseToObsError(resp, obsClient.conf.signature == SignatureObs)
msg = resp.Status
@@ -293,7 +284,7 @@ func prepareData(headers map[string][]string, data interface{}) (io.Reader, erro
var _data io.Reader
if data != nil {
if dataStr, ok := data.(string); ok {
- doLog(LEVEL_DEBUG, "Do http request with string: %s", dataStr)
+ doLog(LEVEL_DEBUG, "Do http request with string")
headers["Content-Length"] = []string{IntToString(len(dataStr))}
_data = strings.NewReader(dataStr)
} else if dataByte, ok := data.([]byte); ok {
@@ -360,7 +351,7 @@ func logHeaders(headers map[string][]string, signature SignatureType) {
} else if securityToken, isSecurityToken = headers[HEADER_STS_TOKEN_OBS]; isSecurityToken {
headers[HEADER_STS_TOKEN_OBS] = []string{"******"}
}
- doLog(LEVEL_DEBUG, "Request headers: %v", headers)
+ doLog(LEVEL_DEBUG, "Request headers: %s", logRequestHeader(headers))
headers[HEADER_AUTH_CAMEL] = auth
if isSecurityToken {
if signature == SignatureObs {
@@ -475,8 +466,32 @@ func prepareRetry(resp *http.Response, headers map[string][]string, _data io.Rea
return _data, resp, nil
}
+// handleBody handles request body
+func handleBody(req *http.Request, body io.Reader, listener ProgressListener, tracker *readerTracker) {
+ reader := body
+ readerLen, err := GetReaderLen(reader)
+ if err == nil {
+ req.ContentLength = readerLen
+ }
+ if req.ContentLength > 0 {
+ req.Header.Set(HEADER_CONTENT_LENGTH_CAMEL, strconv.FormatInt(req.ContentLength, 10))
+ }
+
+ if reader != nil {
+ reader = TeeReader(reader, req.ContentLength, listener, tracker)
+ }
+
+ // HTTP body
+ rc, ok := reader.(io.ReadCloser)
+ if !ok && reader != nil {
+ rc = ioutil.NopCloser(reader)
+ }
+
+ req.Body = rc
+}
+
func (obsClient ObsClient) doHTTP(method, bucketName, objectKey string, params map[string]string,
- headers map[string][]string, data interface{}, repeatable bool) (resp *http.Response, respError error) {
+ headers map[string][]string, data interface{}, repeatable bool, listener ProgressListener) (resp *http.Response, respError error) {
bucketName = strings.TrimSpace(bucketName)
@@ -494,6 +509,9 @@ func (obsClient ObsClient) doHTTP(method, bucketName, objectKey string, params m
var lastRequest *http.Request
redirectFlag := false
+
+ tracker := &readerTracker{completedBytes: 0}
+
for i, redirectCount := 0, 0; i <= maxRetryCount; i++ {
req, err := obsClient.getRequest(redirectURL, requestURL, redirectFlag, _data,
method, bucketName, objectKey, params, headers)
@@ -501,10 +519,16 @@ func (obsClient ObsClient) doHTTP(method, bucketName, objectKey string, params m
return nil, err
}
+ handleBody(req, _data, listener, tracker)
+
logHeaders(headers, obsClient.conf.signature)
lastRequest = prepareReq(headers, req, lastRequest, obsClient.conf.userAgent)
+ // Transfer started
+ event := newProgressEvent(TransferStartedEvent, 0, req.ContentLength)
+ publishProgress(listener, event)
+
start := GetCurrentTimestamp()
resp, err = obsClient.httpClient.Do(req)
doLog(LEVEL_INFO, "Do http request cost %d ms", (GetCurrentTimestamp() - start))
@@ -518,11 +542,14 @@ func (obsClient ObsClient) doHTTP(method, bucketName, objectKey string, params m
break
}
} else {
- doLog(LEVEL_DEBUG, "Response headers: %v", resp.Header)
+ doLog(LEVEL_DEBUG, "Response headers: %s", logResponseHeader(resp.Header))
if resp.StatusCode < 300 {
respError = nil
break
} else if canNotRetry(repeatable, resp.StatusCode) {
+ event = newProgressEvent(TransferFailedEvent, tracker.completedBytes, req.ContentLength)
+ publishProgress(listener, event)
+
respError = ParseResponseToObsError(resp, obsClient.conf.signature == SignatureObs)
resp = nil
break
@@ -564,6 +591,8 @@ func (obsClient ObsClient) doHTTP(method, bucketName, objectKey string, params m
respError = ParseResponseToObsError(resp, obsClient.conf.signature == SignatureObs)
resp = nil
}
+ event = newProgressEvent(TransferFailedEvent, tracker.completedBytes, req.ContentLength)
+ publishProgress(listener, event)
}
}
return
diff --git a/obs/log.go b/obs/log.go
index 20a0631..9896926 100644
--- a/obs/log.go
+++ b/obs/log.go
@@ -15,6 +15,7 @@ package obs
import (
"fmt"
"log"
+ "net/http"
"os"
"path/filepath"
"runtime"
@@ -285,6 +286,7 @@ func initLogFile(_fullPath string) (os.FileInfo, *os.File, error) {
return nil, nil, err
}
}
+
return stat, fd, nil
}
@@ -317,6 +319,10 @@ func doLog(level Level, format string, v ...interface{}) {
msg = fmt.Sprintf("%s:%d|%s", file, line, msg)
}
prefix := logLevelMap[level]
+ defer func() {
+ _ = recover()
+ // ignore ch closed error
+ }()
if consoleLogger != nil {
consoleLogger.Printf("%s%s", prefix, msg)
}
@@ -332,3 +338,39 @@ func checkAndLogErr(err error, level Level, format string, v ...interface{}) {
doLog(level, format, v...)
}
}
+
+func logResponseHeader(respHeader http.Header) string {
+ resp := make([]string, 0, len(respHeader)+1)
+ for key, value := range respHeader {
+ key = strings.TrimSpace(key)
+ if key == "" {
+ continue
+ }
+ if strings.HasPrefix(key, HEADER_PREFIX) || strings.HasPrefix(key, HEADER_PREFIX_OBS) {
+ key = key[len(HEADER_PREFIX):]
+ }
+ _key := strings.ToLower(key)
+ if _, ok := allowedLogResponseHTTPHeaderNames[_key]; ok {
+ resp = append(resp, fmt.Sprintf("%s: [%s]", key, value[0]))
+ }
+ if _key == HEADER_REQUEST_ID {
+ resp = append(resp, fmt.Sprintf("%s: [%s]", key, value[0]))
+ }
+ }
+ return strings.Join(resp, " ")
+}
+
+func logRequestHeader(reqHeader http.Header) string {
+ resp := make([]string, 0, len(reqHeader)+1)
+ for key, value := range reqHeader {
+ key = strings.TrimSpace(key)
+ if key == "" {
+ continue
+ }
+ _key := strings.ToLower(key)
+ if _, ok := allowedRequestHTTPHeaderMetadataNames[_key]; ok {
+ resp = append(resp, fmt.Sprintf("%s: [%s]", key, value[0]))
+ }
+ }
+ return strings.Join(resp, " ")
+}
diff --git a/obs/mime.go b/obs/mime.go
index 8fb11b5..3fba97a 100644
--- a/obs/mime.go
+++ b/obs/mime.go
@@ -391,9 +391,13 @@ var mimeTypes = map[string]string{
"zip": "application/zip",
"dotx": "application/vnd.openxmlformats-officedocument.wordprocessingml.template",
"wps": "application/vnd.ms-works",
- "wpt": "x-lml/x-gps",
"pptm": "application/vnd.ms-powerpoint.presentation.macroenabled.12",
"heic": "image/heic",
"mkv": "video/x-matroska",
"raw": "image/x-panasonic-raw",
+ "webp": "image/webp",
+ "3gp": "audio/3gpp",
+ "3g2": "audio/3gpp2",
+ "weba": "audio/webm",
+ "woff2": "font/woff2",
}
diff --git a/obs/model_base.go b/obs/model_base.go
index 5faabd3..611e521 100644
--- a/obs/model_base.go
+++ b/obs/model_base.go
@@ -247,9 +247,10 @@ type Transition struct {
// Expiration defines expiration property in LifecycleRule
type Expiration struct {
- XMLName xml.Name `xml:"Expiration"`
- Date time.Time `xml:"Date,omitempty"`
- Days int `xml:"Days,omitempty"`
+ XMLName xml.Name `xml:"Expiration"`
+ Date time.Time `xml:"Date,omitempty"`
+ Days int `xml:"Days,omitempty"`
+ ExpiredObjectDeleteMarker string `xml:"ExpiredObjectDeleteMarker,omitempty"`
}
// NoncurrentVersionTransition defines noncurrentVersion transition property in LifecycleRule
@@ -265,15 +266,29 @@ type NoncurrentVersionExpiration struct {
NoncurrentDays int `xml:"NoncurrentDays"`
}
+// AbortIncompleteMultipartUpload defines abortIncomplete expiration property in LifecycleRule
+type AbortIncompleteMultipartUpload struct {
+ XMLName xml.Name `xml:"AbortIncompleteMultipartUpload"`
+ DaysAfterInitiation int `xml:"DaysAfterInitiation"`
+}
+
// LifecycleRule defines lifecycle rule
type LifecycleRule struct {
- ID string `xml:"ID,omitempty"`
- Prefix string `xml:"Prefix"`
- Status RuleStatusType `xml:"Status"`
- Transitions []Transition `xml:"Transition,omitempty"`
- Expiration Expiration `xml:"Expiration,omitempty"`
- NoncurrentVersionTransitions []NoncurrentVersionTransition `xml:"NoncurrentVersionTransition,omitempty"`
- NoncurrentVersionExpiration NoncurrentVersionExpiration `xml:"NoncurrentVersionExpiration,omitempty"`
+ ID string `xml:"ID,omitempty"`
+ Prefix string `xml:"Prefix"`
+ Status RuleStatusType `xml:"Status"`
+ Transitions []Transition `xml:"Transition,omitempty"`
+ Expiration Expiration `xml:"Expiration,omitempty"`
+ NoncurrentVersionTransitions []NoncurrentVersionTransition `xml:"NoncurrentVersionTransition,omitempty"`
+ NoncurrentVersionExpiration NoncurrentVersionExpiration `xml:"NoncurrentVersionExpiration,omitempty"`
+ AbortIncompleteMultipartUpload AbortIncompleteMultipartUpload `xml:"AbortIncompleteMultipartUpload,omitempty"`
+ Filter LifecycleFilter `xml:"Filter,omitempty"`
+}
+
+type LifecycleFilter struct {
+ XMLName xml.Name `xml:"Filter"`
+ Prefix string `xml:"And>Prefix,omitempty"`
+ Tags []Tag `xml:"And>Tag,omitempty"`
}
// BucketEncryptionConfiguration defines the bucket encryption configuration
diff --git a/obs/model_bucket.go b/obs/model_bucket.go
index 866e656..c3033fd 100644
--- a/obs/model_bucket.go
+++ b/obs/model_bucket.go
@@ -55,14 +55,20 @@ type Domain struct {
type ListBucketsInput struct {
QueryLocation bool
BucketType BucketType
+ MaxKeys int
+ Marker string
}
// ListBucketsOutput is the result of ListBuckets function
type ListBucketsOutput struct {
BaseModel
- XMLName xml.Name `xml:"ListAllMyBucketsResult"`
- Owner Owner `xml:"Owner"`
- Buckets []Bucket `xml:"Buckets>Bucket"`
+ XMLName xml.Name `xml:"ListAllMyBucketsResult"`
+ Owner Owner `xml:"Owner"`
+ Buckets []Bucket `xml:"Buckets>Bucket"`
+ IsTruncated bool `xml:"IsTruncated"`
+ Marker string `xml:"Marker"`
+ NextMarker string `xml:"NextMarker"`
+ MaxKeys int `xml:"MaxKeys"`
}
// CreateBucketInput is the input parameter of CreateBucket function
@@ -222,15 +228,11 @@ type SetObjectMetadataInput struct {
Key string
VersionId string
MetadataDirective MetadataDirectiveType
- CacheControl string
- ContentDisposition string
- ContentEncoding string
- ContentLanguage string
- ContentType string
Expires string
WebsiteRedirectLocation string
StorageClass StorageClassType
Metadata map[string]string
+ HttpHeader
}
//SetObjectMetadataOutput is the result of SetObjectMetadata function
@@ -260,7 +262,7 @@ type GetBucketMetadataOutput struct {
MaxAgeSeconds int
ExposeHeader string
Epid string
- AZRedundancy string
+ AZRedundancy AvailableZoneType
FSStatus FSStatusType
BucketRedundancy BucketRedundancyType
}
diff --git a/obs/model_header.go b/obs/model_header.go
index c76d9f0..65dab04 100644
--- a/obs/model_header.go
+++ b/obs/model_header.go
@@ -9,6 +9,7 @@
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
+
package obs
// ISseHeader defines the sse encryption header
diff --git a/obs/model_object.go b/obs/model_object.go
index b5fb0d0..9e8a084 100644
--- a/obs/model_object.go
+++ b/obs/model_object.go
@@ -194,6 +194,7 @@ type GetObjectInput struct {
GetObjectMetadataInput
IfMatch string
IfNoneMatch string
+ AcceptEncoding string
IfUnmodifiedSince time.Time
IfModifiedSince time.Time
RangeStart int64
@@ -233,6 +234,7 @@ type ObjectOperationInput struct {
Expires int64
SseHeader ISseHeader
Metadata map[string]string
+ HttpHeader
}
// PutObjectBasicInput defines the basic object operation properties
@@ -240,7 +242,6 @@ type PutObjectBasicInput struct {
ObjectOperationInput
ContentMD5 string
ContentLength int64
- HttpHeader
}
// PutObjectInput is the input parameter of PutObject function
diff --git a/obs/model_other.go b/obs/model_other.go
index 8ede31d..4d4a320 100644
--- a/obs/model_other.go
+++ b/obs/model_other.go
@@ -9,6 +9,7 @@
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
+
package obs
import (
@@ -20,6 +21,7 @@ type CreateSignedUrlInput struct {
Method HttpMethodType
Bucket string
Key string
+ Policy string
SubResource SubResourceType
Expires int
Headers map[string]string
diff --git a/obs/model_part.go b/obs/model_part.go
index f9503fd..66f485e 100644
--- a/obs/model_part.go
+++ b/obs/model_part.go
@@ -9,6 +9,7 @@
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
+
package obs
import (
diff --git a/obs/model_response.go b/obs/model_response.go
index a8db53e..ec2536c 100644
--- a/obs/model_response.go
+++ b/obs/model_response.go
@@ -9,6 +9,7 @@
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
+
package obs
import (
diff --git a/obs/progress.go b/obs/progress.go
new file mode 100644
index 0000000..89fecfc
--- /dev/null
+++ b/obs/progress.go
@@ -0,0 +1,101 @@
+package obs
+
+import (
+ "io"
+)
+
+type ProgressEventType int
+
+type ProgressEvent struct {
+ ConsumedBytes int64
+ TotalBytes int64
+ EventType ProgressEventType
+}
+
+const (
+ TransferStartedEvent ProgressEventType = 1 + iota
+ TransferDataEvent
+ TransferCompletedEvent
+ TransferFailedEvent
+)
+
+func newProgressEvent(eventType ProgressEventType, consumed, total int64) *ProgressEvent {
+ return &ProgressEvent{
+ ConsumedBytes: consumed,
+ TotalBytes: total,
+ EventType: eventType,
+ }
+}
+
+type ProgressListener interface {
+ ProgressChanged(event *ProgressEvent)
+}
+
+type readerTracker struct {
+ completedBytes int64
+}
+
+// publishProgress
+func publishProgress(listener ProgressListener, event *ProgressEvent) {
+ if listener != nil && event != nil {
+ listener.ProgressChanged(event)
+ }
+}
+
+type teeReader struct {
+ reader io.Reader
+ consumedBytes int64
+ totalBytes int64
+ tracker *readerTracker
+ listener ProgressListener
+}
+
+func TeeReader(reader io.Reader, totalBytes int64, listener ProgressListener, tracker *readerTracker) io.ReadCloser {
+ return &teeReader{
+ reader: reader,
+ consumedBytes: 0,
+ totalBytes: totalBytes,
+ tracker: tracker,
+ listener: listener,
+ }
+}
+
+func (t *teeReader) Read(p []byte) (n int, err error) {
+ n, err = t.reader.Read(p)
+
+ if err != nil && err != io.EOF {
+ event := newProgressEvent(TransferFailedEvent, t.consumedBytes, t.totalBytes)
+ publishProgress(t.listener, event)
+ }
+
+ if n > 0 {
+ t.consumedBytes += int64(n)
+
+ if t.listener != nil {
+ event := newProgressEvent(TransferDataEvent, t.consumedBytes, t.totalBytes)
+ publishProgress(t.listener, event)
+ }
+
+ if t.tracker != nil {
+ t.tracker.completedBytes = t.consumedBytes
+ }
+ }
+
+ if err == io.EOF {
+ event := newProgressEvent(TransferCompletedEvent, t.consumedBytes, t.totalBytes)
+ publishProgress(t.listener, event)
+ }
+
+ return
+}
+
+func (r *teeReader) Size() int64 {
+ return r.totalBytes
+}
+
+func (t *teeReader) Close() error {
+ if rc, ok := t.reader.(io.ReadCloser); ok {
+ return rc.Close()
+ }
+ return nil
+}
diff --git a/obs/provider.go b/obs/provider.go
index 2e485c1..297efa4 100644
--- a/obs/provider.go
+++ b/obs/provider.go
@@ -206,7 +206,6 @@ func (ecsSp *EcsSecurityProvider) getSecurity() securityHolder {
}
func getInternalTransport() *http.Transport {
-
timeout := 10
transport := &http.Transport{
Dial: func(network, addr string) (net.Conn, error) {
diff --git a/obs/temporary_createSignedUrl.go b/obs/temporary_createSignedUrl.go
index 5c27e64..14d7742 100644
--- a/obs/temporary_createSignedUrl.go
+++ b/obs/temporary_createSignedUrl.go
@@ -52,7 +52,7 @@ func (obsClient ObsClient) CreateSignedUrl(input *CreateSignedUrlInput, extensio
input.Expires = 300
}
- requestURL, err := obsClient.doAuthTemporary(string(input.Method), input.Bucket, input.Key, params, headers, int64(input.Expires))
+ requestURL, err := obsClient.doAuthTemporary(string(input.Method), input.Bucket, input.Key, input.Policy, params, headers, int64(input.Expires))
if err != nil {
return nil, err
}
diff --git a/obs/temporary_signedUrl.go b/obs/temporary_signedUrl.go
index be92782..c6d6f6f 100644
--- a/obs/temporary_signedUrl.go
+++ b/obs/temporary_signedUrl.go
@@ -508,7 +508,7 @@ func (obsClient ObsClient) GetObjectMetadataWithSignedUrl(signedUrl string, actu
// GetObjectWithSignedUrl downloads object with the specified signed url and signed request headers
func (obsClient ObsClient) GetObjectWithSignedUrl(signedUrl string, actualSignedRequestHeaders http.Header) (output *GetObjectOutput, err error) {
output = &GetObjectOutput{}
- err = obsClient.doHTTPWithSignedURL("GetObject", HTTP_GET, signedUrl, actualSignedRequestHeaders, nil, output, true)
+ err = obsClient.doHTTPWithSignedURL(GET_OBJECT, HTTP_GET, signedUrl, actualSignedRequestHeaders, nil, output, true)
if err != nil {
output = nil
} else {
@@ -520,7 +520,7 @@ func (obsClient ObsClient) GetObjectWithSignedUrl(signedUrl string, actualSigned
// PutObjectWithSignedUrl uploads an object to the specified bucket with the specified signed url and signed request headers and data
func (obsClient ObsClient) PutObjectWithSignedUrl(signedUrl string, actualSignedRequestHeaders http.Header, data io.Reader) (output *PutObjectOutput, err error) {
output = &PutObjectOutput{}
- err = obsClient.doHTTPWithSignedURL("PutObject", HTTP_PUT, signedUrl, actualSignedRequestHeaders, data, output, true)
+ err = obsClient.doHTTPWithSignedURL(PUT_OBJECT, HTTP_PUT, signedUrl, actualSignedRequestHeaders, data, output, true)
if err != nil {
output = nil
} else {
@@ -570,7 +570,7 @@ func (obsClient ObsClient) PutFileWithSignedUrl(signedUrl string, actualSignedRe
}
output = &PutObjectOutput{}
- err = obsClient.doHTTPWithSignedURL("PutObject", HTTP_PUT, signedUrl, actualSignedRequestHeaders, data, output, true)
+ err = obsClient.doHTTPWithSignedURL(PUT_FILE, HTTP_PUT, signedUrl, actualSignedRequestHeaders, data, output, true)
if err != nil {
output = nil
} else {
@@ -734,7 +734,7 @@ func (obsClient ObsClient) DeleteBucketEncryptionWithSignedURL(signedURL string,
// AppendObjectWithSignedUrl uploads an object to the specified bucket with the specified signed url and signed request headers and data
func (obsClient ObsClient) AppendObjectWithSignedURL(signedURL string, actualSignedRequestHeaders http.Header, data io.Reader) (output *AppendObjectOutput, err error) {
output = &AppendObjectOutput{}
- err = obsClient.doHTTPWithSignedURL("AppendObject", HTTP_POST, signedURL, actualSignedRequestHeaders, data, output, true)
+ err = obsClient.doHTTPWithSignedURL(APPEND_OBJECT, HTTP_POST, signedURL, actualSignedRequestHeaders, data, output, true)
if err != nil {
output = nil
} else {
diff --git a/obs/trait_base.go b/obs/trait_base.go
index 63bddb2..909c7a0 100644
--- a/obs/trait_base.go
+++ b/obs/trait_base.go
@@ -16,6 +16,10 @@ import (
"io"
)
+type IRepeatable interface {
+ Reset() error
+}
+
// IReadCloser defines interface with function: setReadCloser
type IReadCloser interface {
setReadCloser(body io.ReadCloser)
diff --git a/obs/trait_bucket.go b/obs/trait_bucket.go
index 97b66e8..2818084 100644
--- a/obs/trait_bucket.go
+++ b/obs/trait_bucket.go
@@ -18,6 +18,13 @@ import (
)
func (input ListBucketsInput) trans(isObs bool) (params map[string]string, headers map[string][]string, data interface{}, err error) {
+ params = make(map[string]string)
+ if input.MaxKeys > 0 {
+ params["max-keys"] = IntToString(input.MaxKeys)
+ }
+ if input.Marker != "" {
+ params["marker"] = input.Marker
+ }
headers = make(map[string][]string)
if input.QueryLocation && !isObs {
setHeaders(headers, HEADER_LOCATION_AMZ, []string{"true"}, isObs)
diff --git a/obs/trait_object.go b/obs/trait_object.go
index 7aaf425..48011ca 100644
--- a/obs/trait_object.go
+++ b/obs/trait_object.go
@@ -148,6 +148,9 @@ func (input GetObjectMetadataInput) trans(isObs bool) (params map[string]string,
}
func (input SetObjectMetadataInput) prepareContentHeaders(headers map[string][]string) {
+ if input.CacheControl != "" {
+ headers[HEADER_CACHE_CONTROL_CAMEL] = []string{input.CacheControl}
+ }
if input.ContentDisposition != "" {
headers[HEADER_CONTENT_DISPOSITION_CAMEL] = []string{input.ContentDisposition}
}
@@ -157,10 +160,15 @@ func (input SetObjectMetadataInput) prepareContentHeaders(headers map[string][]s
if input.ContentLanguage != "" {
headers[HEADER_CONTENT_LANGUAGE_CAMEL] = []string{input.ContentLanguage}
}
-
if input.ContentType != "" {
headers[HEADER_CONTENT_TYPE_CAML] = []string{input.ContentType}
}
+ // 这里为了兼容老版本,默认以Expire值为准,但如果Expires没有,则以HttpExpires为准。
+ if input.Expires != "" {
+ headers[HEADER_EXPIRES_CAMEL] = []string{input.Expires}
+ } else if input.HttpExpires != "" {
+ headers[HEADER_EXPIRES_CAMEL] = []string{input.HttpExpires}
+ }
}
func (input SetObjectMetadataInput) prepareStorageClass(headers map[string][]string, isObs bool) {
@@ -189,13 +197,8 @@ func (input SetObjectMetadataInput) trans(isObs bool) (params map[string]string,
} else {
setHeaders(headers, HEADER_METADATA_DIRECTIVE, []string{string(ReplaceNew)}, isObs)
}
- if input.CacheControl != "" {
- headers[HEADER_CACHE_CONTROL_CAMEL] = []string{input.CacheControl}
- }
+
input.prepareContentHeaders(headers)
- if input.Expires != "" {
- headers[HEADER_EXPIRES_CAMEL] = []string{input.Expires}
- }
if input.WebsiteRedirectLocation != "" {
setHeaders(headers, HEADER_WEBSITE_REDIRECT_LOCATION, []string{input.WebsiteRedirectLocation}, isObs)
}
@@ -242,7 +245,9 @@ func (input GetObjectInput) trans(isObs bool) (params map[string]string, headers
if input.RangeStart >= 0 && input.RangeEnd > input.RangeStart {
headers[HEADER_RANGE] = []string{fmt.Sprintf("bytes=%d-%d", input.RangeStart, input.RangeEnd)}
}
-
+ if input.AcceptEncoding != "" {
+ headers[HEADER_ACCEPT_ENCODING] = []string{input.AcceptEncoding}
+ }
if input.IfMatch != "" {
headers[HEADER_IF_MATCH] = []string{input.IfMatch}
}
diff --git a/obs/transfer.go b/obs/transfer.go
index 956629e..d1a783c 100644
--- a/obs/transfer.go
+++ b/obs/transfer.go
@@ -364,12 +364,18 @@ func (obsClient ObsClient) resumeUpload(input *UploadFileInput, extensions []ext
return completeOutput, err
}
-func handleUploadTaskResult(result interface{}, ufc *UploadCheckpoint, partNum int, enableCheckpoint bool, checkpointFilePath string, lock *sync.Mutex) (err error) {
+func handleUploadTaskResult(result interface{}, ufc *UploadCheckpoint, partNum int, enableCheckpoint bool, checkpointFilePath string, lock *sync.Mutex, completedBytes *int64, listener ProgressListener) (err error) {
if uploadPartOutput, ok := result.(*UploadPartOutput); ok {
lock.Lock()
defer lock.Unlock()
ufc.UploadParts[partNum-1].Etag = uploadPartOutput.ETag
ufc.UploadParts[partNum-1].IsCompleted = true
+
+ atomic.AddInt64(completedBytes, ufc.UploadParts[partNum-1].PartSize)
+
+ event := newProgressEvent(TransferDataEvent, *completedBytes, ufc.FileInfo.Size)
+ publishProgress(listener, event)
+
if enableCheckpoint {
_err := updateCheckpointFile(ufc, checkpointFilePath)
if _err != nil {
@@ -390,11 +396,21 @@ func (obsClient ObsClient) uploadPartConcurrent(ufc *UploadCheckpoint, checkpoin
var errFlag int32
var abort int32
lock := new(sync.Mutex)
+
+ var completedBytes int64
+ listener := obsClient.getProgressListener(extensions)
+ totalBytes := ufc.FileInfo.Size
+ event := newProgressEvent(TransferStartedEvent, 0, totalBytes)
+ publishProgress(listener, event)
+
for _, uploadPart := range ufc.UploadParts {
if atomic.LoadInt32(&abort) == 1 {
break
}
if uploadPart.IsCompleted {
+ atomic.AddInt64(&completedBytes, uploadPart.PartSize)
+ event := newProgressEvent(TransferDataEvent, completedBytes, ufc.FileInfo.Size)
+ publishProgress(listener, event)
continue
}
task := uploadPartTask{
@@ -415,7 +431,7 @@ func (obsClient ObsClient) uploadPartConcurrent(ufc *UploadCheckpoint, checkpoin
}
pool.ExecuteFunc(func() interface{} {
result := task.Run()
- err := handleUploadTaskResult(result, ufc, task.PartNumber, input.EnableCheckpoint, input.CheckpointFile, lock)
+ err := handleUploadTaskResult(result, ufc, task.PartNumber, input.EnableCheckpoint, input.CheckpointFile, lock, &completedBytes, listener)
if err != nil && atomic.CompareAndSwapInt32(&errFlag, 0, 1) {
uploadPartError.Store(err)
}
@@ -424,8 +440,14 @@ func (obsClient ObsClient) uploadPartConcurrent(ufc *UploadCheckpoint, checkpoin
}
pool.ShutDown()
if err, ok := uploadPartError.Load().(error); ok {
+
+ event := newProgressEvent(TransferFailedEvent, completedBytes, ufc.FileInfo.Size)
+ publishProgress(listener, event)
+
return err
}
+ event = newProgressEvent(TransferCompletedEvent, completedBytes, ufc.FileInfo.Size)
+ publishProgress(listener, event)
return nil
}
@@ -513,9 +535,9 @@ func (task *downloadPartTask) Run() interface{} {
var output *GetObjectOutput
var err error
if len(task.extensions) != 0 {
- output, err = task.obsClient.GetObject(getObjectInput, task.extensions...)
+ output, err = task.obsClient.GetObjectWithoutProgress(getObjectInput, task.extensions...)
} else {
- output, err = task.obsClient.GetObject(getObjectInput)
+ output, err = task.obsClient.GetObjectWithoutProgress(getObjectInput)
}
if err == nil {
@@ -807,11 +829,17 @@ func updateDownloadFile(filePath string, rangeStart int64, output *GetObjectOutp
return nil
}
-func handleDownloadTaskResult(result interface{}, dfc *DownloadCheckpoint, partNum int64, enableCheckpoint bool, checkpointFile string, lock *sync.Mutex) (err error) {
- if _, ok := result.(*GetObjectOutput); ok {
+func handleDownloadTaskResult(result interface{}, dfc *DownloadCheckpoint, partNum int64, enableCheckpoint bool, checkpointFile string, lock *sync.Mutex, completedBytes *int64, listener ProgressListener) (err error) {
+ if output, ok := result.(*GetObjectOutput); ok {
lock.Lock()
defer lock.Unlock()
dfc.DownloadParts[partNum-1].IsCompleted = true
+
+ atomic.AddInt64(completedBytes, output.ContentLength)
+
+ event := newProgressEvent(TransferDataEvent, *completedBytes, dfc.ObjectInfo.Size)
+ publishProgress(listener, event)
+
if enableCheckpoint {
_err := updateCheckpointFile(dfc, checkpointFile)
if _err != nil {
@@ -832,11 +860,21 @@ func (obsClient ObsClient) downloadFileConcurrent(input *DownloadFileInput, dfc
var errFlag int32
var abort int32
lock := new(sync.Mutex)
+
+ var completedBytes int64
+ listener := obsClient.getProgressListener(extensions)
+ totalBytes := dfc.ObjectInfo.Size
+ event := newProgressEvent(TransferStartedEvent, 0, totalBytes)
+ publishProgress(listener, event)
+
for _, downloadPart := range dfc.DownloadParts {
if atomic.LoadInt32(&abort) == 1 {
break
}
if downloadPart.IsCompleted {
+ atomic.AddInt64(&completedBytes, downloadPart.RangeEnd-downloadPart.Offset+1)
+ event := newProgressEvent(TransferDataEvent, completedBytes, dfc.ObjectInfo.Size)
+ publishProgress(listener, event)
continue
}
task := downloadPartTask{
@@ -858,7 +896,7 @@ func (obsClient ObsClient) downloadFileConcurrent(input *DownloadFileInput, dfc
}
pool.ExecuteFunc(func() interface{} {
result := task.Run()
- err := handleDownloadTaskResult(result, dfc, task.partNumber, input.EnableCheckpoint, input.CheckpointFile, lock)
+ err := handleDownloadTaskResult(result, dfc, task.partNumber, input.EnableCheckpoint, input.CheckpointFile, lock, &completedBytes, listener)
if err != nil && atomic.CompareAndSwapInt32(&errFlag, 0, 1) {
downloadPartError.Store(err)
}
@@ -867,8 +905,11 @@ func (obsClient ObsClient) downloadFileConcurrent(input *DownloadFileInput, dfc
}
pool.ShutDown()
if err, ok := downloadPartError.Load().(error); ok {
+ event := newProgressEvent(TransferFailedEvent, completedBytes, dfc.ObjectInfo.Size)
+ publishProgress(listener, event)
return err
}
-
+ event = newProgressEvent(TransferCompletedEvent, completedBytes, dfc.ObjectInfo.Size)
+ publishProgress(listener, event)
return nil
}
diff --git a/obs/util.go b/obs/util.go
index 15683d3..062cc10 100644
--- a/obs/util.go
+++ b/obs/util.go
@@ -13,6 +13,7 @@
package obs
import (
+ "bytes"
"crypto/hmac"
"crypto/md5"
"crypto/sha1"
@@ -22,8 +23,10 @@ import (
"encoding/json"
"encoding/xml"
"fmt"
+ "io"
"net/http"
"net/url"
+ "os"
"regexp"
"strconv"
"strings"
@@ -68,7 +71,7 @@ func IsHandleCallbackResponse(action string, headers map[string][]string, isObs
if isObs == true {
headerPrefix = HEADER_PREFIX_OBS
}
- supportCallbackActions := []string{"PutObject", "PutFile", "CompleteMultipartUpload"}
+ supportCallbackActions := []string{PUT_OBJECT, PUT_FILE, "CompleteMultipartUpload"}
return len(headers[headerPrefix+CALLBACK]) != 0 && IsContain(supportCallbackActions, action)
}
@@ -581,3 +584,37 @@ func getTemporaryAuthorization(ak, sk, method, bucketName, objectKey, signature
return
}
+
+func GetContentType(key string) (string, bool) {
+ if ct, ok := mimeTypes[strings.ToLower(key[strings.LastIndex(key, ".")+1:])]; ok {
+ return ct, ok
+ }
+ return "", false
+}
+
+func GetReaderLen(reader io.Reader) (int64, error) {
+ var contentLength int64
+ var err error
+ switch v := reader.(type) {
+ case *bytes.Buffer:
+ contentLength = int64(v.Len())
+ case *bytes.Reader:
+ contentLength = int64(v.Len())
+ case *strings.Reader:
+ contentLength = int64(v.Len())
+ case *os.File:
+ fInfo, fError := v.Stat()
+ if fError != nil {
+ err = fmt.Errorf("can't get reader content length,%s", fError.Error())
+ } else {
+ contentLength = fInfo.Size()
+ }
+ case *io.LimitedReader:
+ contentLength = int64(v.N)
+ case *fileReaderWrapper:
+ contentLength = int64(v.totalCount)
+ default:
+ err = fmt.Errorf("can't get reader content length,unkown reader type")
+ }
+ return contentLength, err
+}