Skip to content

Commit

Permalink
fix oss download path
Browse files Browse the repository at this point in the history
  • Loading branch information
zyxkad committed Jan 23, 2024
1 parent d334ddb commit ff85860
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 120 deletions.
124 changes: 124 additions & 0 deletions cluster_oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,15 @@ package main

import (
"context"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"time"
)

Expand Down Expand Up @@ -84,7 +91,9 @@ func (cr *Cluster) ossSyncFiles(ctx context.Context, files []FileInfo) error {
case path := <-pathRes:
if path != "" {
defer os.Remove(path)
relpath := hashToFilename(f.Hash)
for _, target := range f.targets {
target = filepath.Join(target, relpath)
if err := copyFile(path, target, 0644); err != nil {
logErrorf("Could not copy file %q to %q:\n\t%v", path, target, err)
}
Expand All @@ -108,3 +117,118 @@ func (cr *Cluster) ossSyncFiles(ctx context.Context, files []FileInfo) error {
logInfof("All files was synchronized, use time: %v, %s/s", use, bytesToUnit(stats.totalsize/use.Seconds()))
return nil
}

func createOssMirrorDir(item *OSSItem) {
logInfof("Creating OSS folder %s", item.FolderPath)
if err := os.MkdirAll(item.FolderPath, 0755); err != nil && !errors.Is(err, os.ErrExist) {
logErrorf("Cannot create OSS mirror folder %q: %v", item.FolderPath, err)
os.Exit(2)
}

// cacheDir := filepath.Join(baseDir, "cache")
// downloadDir := filepath.Join(item.FolderPath, "download")
// os.RemoveAll(downloadDir)
// if err := os.Mkdir(downloadDir, 0755); err != nil && !errors.Is(err, os.ErrExist) {
// logErrorf("Cannot create OSS mirror folder %q: %v", downloadDir, err)
// os.Exit(2)
// }
// for i := 0; i < 0x100; i++ {
// d := hex.EncodeToString([]byte{(byte)(i)})
// o := filepath.Join(cacheDir, d)
// t := filepath.Join(downloadDir, d)
// os.Mkdir(o, 0755)
// if err := os.Symlink(filepath.Join("..", "..", o), t); err != nil {
// logErrorf("Cannot create OSS mirror cache symlink %q: %v", t, err)
// os.Exit(2)
// }
// }

if !item.SkipMeasureGen {
logDebug("Creating measure files")
measureDir := filepath.Join(item.FolderPath, "measure")
if err := os.Mkdir(measureDir, 0755); err != nil && !errors.Is(err, os.ErrExist) {
logErrorf("Cannot create OSS mirror folder %q: %v", measureDir, err)
os.Exit(2)
}
const chunkSize = 1024 * 1024
var chunk [chunkSize]byte
for i := 1; i <= 200; i++ {
size := i * chunkSize
t := filepath.Join(measureDir, strconv.Itoa(i))
if stat, err := os.Stat(t); err == nil {
x := stat.Size()
if x == (int64)(size) {
logDebug("Skipping", t)
continue
}
logDebugf("File [%d] size %d does not match %d", i, x, size)
} else {
logDebugf("Cannot get stat of %s: %v", t, err)
}
logDebug("Writing", t)
fd, err := os.Create(t)
if err != nil {
logErrorf("Cannot create OSS mirror measure file %q: %v", t, err)
os.Exit(2)
}
for j := 0; j < i; j++ {
if _, err = fd.Write(chunk[:]); err != nil {
logErrorf("Cannot write OSS mirror measure file %q: %v", t, err)
os.Exit(2)
}
}
fd.Close()
}
logDebug("Measure files created")
}
}

func checkOSS(ctx context.Context, client *http.Client, item *OSSItem, size int) (supportRange bool, err error) {
targetSize := (int64)(size) * 1024 * 1024
logInfof("Checking %s for %d bytes ...", item.RedirectBase, targetSize)

target, err := url.JoinPath(item.RedirectBase, "measure", strconv.Itoa(size))
if err != nil {
return false, fmt.Errorf("Cannot check OSS server: %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, target, nil)
if err != nil {
return
}
req.Header.Set("Range", "bytes=1-")
res, err := client.Do(req)
if err != nil {
return false, fmt.Errorf("OSS check request failed %q: %w", target, err)
}
defer res.Body.Close()
logDebugf("OSS check response status code %d %s", res.StatusCode, res.Status)
if supportRange = res.StatusCode == http.StatusPartialContent; supportRange {
logDebug("OSS support Range header!")
targetSize--
} else if res.StatusCode != http.StatusOK {
return false, fmt.Errorf("OSS check request failed %q: %d %s", target, res.StatusCode, res.Status)
} else {
crange := res.Header.Get("Content-Range")
if len(crange) > 0 {
logWarn("Non standard http response detected, responsed 'Content-Range' header with status 200, expected status 206")
fields := strings.Fields(crange)
if len(fields) >= 2 && fields[0] == "bytes" && strings.HasPrefix(fields[1], "1-") {
logDebug("OSS support Range header?")
supportRange = true
targetSize--
}
}
}
logDebug("reading OSS response")
start := time.Now()
n, err := io.Copy(io.Discard, res.Body)
if err != nil {
return false, fmt.Errorf("OSS check request failed %q: %w", target, err)
}
used := time.Since(start)
if n != targetSize {
return false, fmt.Errorf("OSS check request failed %q: expected %d bytes, but got %d bytes", target, targetSize, n)
}
logInfof("Check finished for %q, used %v, %s/s; supportRange=%v", target, used, bytesToUnit((float64)(n)/used.Seconds()), supportRange)
return
}
120 changes: 0 additions & 120 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,10 @@ import (
"context"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os"
"os/signal"
"path/filepath"
"strconv"
"strings"
"sync/atomic"
"syscall"
Expand Down Expand Up @@ -106,7 +102,6 @@ START:
os.Exit(1)
}
}()
time.Sleep(time.Second * 100000)
}

var ossList []*OSSItem
Expand Down Expand Up @@ -289,118 +284,3 @@ START:
}
}
}

func createOssMirrorDir(item *OSSItem) {
logInfof("Creating OSS folder %s", item.FolderPath)
if err := os.MkdirAll(item.FolderPath, 0755); err != nil && !errors.Is(err, os.ErrExist) {
logErrorf("Cannot create OSS mirror folder %q: %v", item.FolderPath, err)
os.Exit(2)
}

// cacheDir := filepath.Join(baseDir, "cache")
// downloadDir := filepath.Join(item.FolderPath, "download")
// os.RemoveAll(downloadDir)
// if err := os.Mkdir(downloadDir, 0755); err != nil && !errors.Is(err, os.ErrExist) {
// logErrorf("Cannot create OSS mirror folder %q: %v", downloadDir, err)
// os.Exit(2)
// }
// for i := 0; i < 0x100; i++ {
// d := hex.EncodeToString([]byte{(byte)(i)})
// o := filepath.Join(cacheDir, d)
// t := filepath.Join(downloadDir, d)
// os.Mkdir(o, 0755)
// if err := os.Symlink(filepath.Join("..", "..", o), t); err != nil {
// logErrorf("Cannot create OSS mirror cache symlink %q: %v", t, err)
// os.Exit(2)
// }
// }

if !item.SkipMeasureGen {
logDebug("Creating measure files")
measureDir := filepath.Join(item.FolderPath, "measure")
if err := os.Mkdir(measureDir, 0755); err != nil && !errors.Is(err, os.ErrExist) {
logErrorf("Cannot create OSS mirror folder %q: %v", measureDir, err)
os.Exit(2)
}
const chunkSize = 1024 * 1024
var chunk [chunkSize]byte
for i := 1; i <= 200; i++ {
size := i * chunkSize
t := filepath.Join(measureDir, strconv.Itoa(i))
if stat, err := os.Stat(t); err == nil {
x := stat.Size()
if x == (int64)(size) {
logDebug("Skipping", t)
continue
}
logDebugf("File [%d] size %d does not match %d", i, x, size)
} else {
logDebugf("Cannot get stat of %s: %v", t, err)
}
logDebug("Writing", t)
fd, err := os.Create(t)
if err != nil {
logErrorf("Cannot create OSS mirror measure file %q: %v", t, err)
os.Exit(2)
}
for j := 0; j < i; j++ {
if _, err = fd.Write(chunk[:]); err != nil {
logErrorf("Cannot write OSS mirror measure file %q: %v", t, err)
os.Exit(2)
}
}
fd.Close()
}
logDebug("Measure files created")
}
}

func checkOSS(ctx context.Context, client *http.Client, item *OSSItem, size int) (supportRange bool, err error) {
targetSize := (int64)(size) * 1024 * 1024
logInfof("Checking %s for %d bytes ...", item.RedirectBase, targetSize)

target, err := url.JoinPath(item.RedirectBase, "measure", strconv.Itoa(size))
if err != nil {
return false, fmt.Errorf("Cannot check OSS server: %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, target, nil)
if err != nil {
return
}
req.Header.Set("Range", "bytes=1-")
res, err := client.Do(req)
if err != nil {
return false, fmt.Errorf("OSS check request failed %q: %w", target, err)
}
defer res.Body.Close()
logDebugf("OSS check response status code %d %s", res.StatusCode, res.Status)
if supportRange = res.StatusCode == http.StatusPartialContent; supportRange {
logDebug("OSS support Range header!")
targetSize--
} else if res.StatusCode != http.StatusOK {
return false, fmt.Errorf("OSS check request failed %q: %d %s", target, res.StatusCode, res.Status)
} else {
crange := res.Header.Get("Content-Range")
if len(crange) > 0 {
logWarn("Non standard http response detected, responsed 'Content-Range' header with status 200, expected status 206")
fields := strings.Fields(crange)
if len(fields) >= 2 && fields[0] == "bytes" && strings.HasPrefix(fields[1], "1-") {
logDebug("OSS support Range header?")
supportRange = true
targetSize--
}
}
}
logDebug("reading OSS response")
start := time.Now()
n, err := io.Copy(io.Discard, res.Body)
if err != nil {
return false, fmt.Errorf("OSS check request failed %q: %w", target, err)
}
used := time.Since(start)
if n != targetSize {
return false, fmt.Errorf("OSS check request failed %q: expected %d bytes, but got %d bytes", target, targetSize, n)
}
logInfof("Check finished for %q, used %v, %s/s; supportRange=%v", target, used, bytesToUnit((float64)(n)/used.Seconds()), supportRange)
return
}

0 comments on commit ff85860

Please sign in to comment.