From ee34e5d3f90cc0aa6719e58136bdc91f9811d8e8 Mon Sep 17 00:00:00 2001 From: zyxkad Date: Thu, 14 Dec 2023 16:57:56 -0700 Subject: [PATCH] add data/stat.json --- .gitignore | 2 + src/cluster.go | 43 ++++++--- src/handler.go | 4 +- src/main.go | 22 +++-- src/stat.go | 258 +++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 306 insertions(+), 23 deletions(-) create mode 100644 src/stat.go diff --git a/.gitignore b/.gitignore index 6c1401d8..4b4b3c4f 100644 --- a/.gitignore +++ b/.gitignore @@ -6,8 +6,10 @@ /test /pems /cache +/data /logs /.tmp +/__hijack /oss_mirror # binarys diff --git a/src/cluster.go b/src/cluster.go index c5fef8e4..aead80bf 100644 --- a/src/cluster.go +++ b/src/cluster.go @@ -38,10 +38,13 @@ type Cluster struct { cacheDir string tmpDir string + dataDir string maxConn int - hits atomic.Int32 - hbytes atomic.Int64 - issync atomic.Bool + + stats Stats + hits atomic.Int32 + hbts atomic.Int64 + issync atomic.Bool ctx context.Context mux sync.Mutex @@ -55,13 +58,13 @@ type Cluster struct { } func NewCluster( - ctx context.Context, cacheDir string, + ctx context.Context, baseDir string, host string, publicPort uint16, username string, password string, address string, byoc bool, dialer *net.Dialer, redirectBase string, -) (cr *Cluster) { +) (cr *Cluster, err error) { transport := &http.Transport{} if dialer != nil { transport.DialContext = dialer.DialContext @@ -79,8 +82,9 @@ func NewCluster( redirectBase: redirectBase, - cacheDir: cacheDir, - tmpDir: filepath.Join(cacheDir, ".tmp"), + cacheDir: filepath.Join(baseDir, "cache"), + tmpDir: filepath.Join(baseDir, "cache", ".tmp"), + dataDir: filepath.Join(baseDir, "data"), maxConn: 128, client: &http.Client{ @@ -91,9 +95,17 @@ func NewCluster( }, } cr.Server.Handler = cr - os.MkdirAll(cr.cacheDir, 0755) + + // create folder strcture os.RemoveAll(cr.tmpDir) - os.Mkdir(cr.tmpDir, 0700) + os.MkdirAll(cr.cacheDir, 0755) + os.MkdirAll(cr.dataDir, 0755) + os.MkdirAll(cr.tmpDir, 0700) + + // read old stats + if err = cr.stats.Load(cr.dataDir); err != nil { + return + } return } @@ -195,13 +207,18 @@ func (cr *Cluster) Enable() (err error) { return } +// KeepAlive will fresh hits & hit bytes data and send the keep-alive packet func (cr *Cluster) KeepAlive() (ok bool) { - hits, hbytes := cr.hits.Swap(0), cr.hbytes.Swap(0) + hits, hbts := cr.hits.Swap(0), cr.hbts.Swap(0) + cr.stats.AddHits(hits, hbts) data, err := cr.socket.EmitAck("keep-alive", json.JsonObj{ "time": time.Now().UTC().Format("2006-01-02T15:04:05Z"), "hits": hits, - "bytes": hbytes, + "bytes": hbts, }) + if e := cr.stats.Save(cr.dataDir); e != nil { + logError("Error when saving status:", e) + } if err != nil { logError("Error when keep-alive:", err) return false @@ -210,7 +227,7 @@ func (cr *Cluster) KeepAlive() (ok bool) { logError("Keep-alive failed:", erro) return false } - logInfo("Keep-alive success:", hits, bytesToUnit((float64)(hbytes)), data) + logInfo("Keep-alive success:", hits, bytesToUnit((float64)(hbts)), data) return true } @@ -218,7 +235,6 @@ func (cr *Cluster) Disable() (ok bool) { cr.mux.Lock() defer cr.mux.Unlock() - cr.KeepAlive() if !cr.enabled { logDebug("Extra disable") return true @@ -231,6 +247,7 @@ func (cr *Cluster) Disable() (ok bool) { if cr.socket == nil { return true } + cr.KeepAlive() data, err := cr.socket.EmitAck("disable") cr.enabled = false cr.socket.Close() diff --git a/src/handler.go b/src/handler.go index 332a7314..d5d6a0e6 100644 --- a/src/handler.go +++ b/src/handler.go @@ -55,7 +55,7 @@ func (cr *Cluster) ServeHTTP(rw http.ResponseWriter, req *http.Request) { } http.Redirect(rw, req, target, http.StatusFound) cr.hits.Add(1) - cr.hbytes.Add(stat.Size()) + cr.hbts.Add(stat.Size()) return } rw.Header().Set("Cache-Control", "max-age=2592000") // 30 days @@ -69,7 +69,7 @@ func (cr *Cluster) ServeHTTP(rw http.ResponseWriter, req *http.Request) { counter := &countReader{ReadSeeker: fd} http.ServeContent(rw, req, name, time.Time{}, counter) cr.hits.Add(1) - cr.hbytes.Add(counter.n) + cr.hbts.Add(counter.n) return case strings.HasPrefix(rawpath, "/measure/"): if req.Header.Get("x-openbmclapi-secret") != cr.password { diff --git a/src/main.go b/src/main.go index 6f5a445d..190fedfc 100644 --- a/src/main.go +++ b/src/main.go @@ -114,11 +114,12 @@ func readConfig() { } } -const cacheDir = "cache" +const baseDir = "." -var hijackPath = filepath.Join(cacheDir, "__hijack") - -const ossMirrorDir = "oss_mirror" +var ( + hijackPath = filepath.Join(baseDir, "__hijack") + ossMirrorDir = filepath.Join(baseDir, "oss_mirror") +) func main() { defer func() { @@ -168,15 +169,19 @@ START: if config.UseOss { redirectBase = config.OssRedirectBase } - cluster := NewCluster(ctx, cacheDir, + + logInfof("Starting Go-OpenBmclApi v%s (%s)", ClusterVersion, BuildVersion) + cluster, err := NewCluster(ctx, baseDir, config.PublicHost, config.PublicPort, config.ClusterId, config.ClusterSecret, fmt.Sprintf("%s:%d", "0.0.0.0", config.Port), config.Nohttps, dialer, redirectBase, ) - - logInfof("Starting Go-OpenBmclApi v%s (%s)", ClusterVersion, BuildVersion) + if err != nil { + logError("Cannot init cluster:", err) + os.Exit(1) + } { logInfof("Fetching file list") @@ -290,6 +295,7 @@ func createOssMirrorDir() { logErrorf("Cannot create OSS mirror folder %q: %v", ossMirrorDir, err) os.Exit(2) } + cacheDir := filepath.Join(baseDir, "cache") downloadDir := filepath.Join(ossMirrorDir, "download") os.RemoveAll(downloadDir) if err := os.Mkdir(downloadDir, 0755); err != nil && !errors.Is(err, os.ErrExist) { @@ -361,7 +367,7 @@ func assertOSS(size int) { os.Exit(2) } if n != (int64)(size)*1024*1024 { - logErrorf("OSS check request failed %q: expected 10MB, but got %d bytes", target, n) + logErrorf("OSS check request failed %q: expected %dMB, but got %d bytes", target, size, n) os.Exit(2) } } diff --git a/src/stat.go b/src/stat.go new file mode 100644 index 00000000..304ba530 --- /dev/null +++ b/src/stat.go @@ -0,0 +1,258 @@ +package main + +import ( + "encoding/json" + "errors" + "os" + "path/filepath" + "strconv" + "sync" + "time" +) + +type statInstData struct { + Hits int32 `json:"hits"` + Bytes int64 `json:"bytes"` +} + +func (d *statInstData) update(hits int32, bytes int64) { + d.Hits += hits + d.Bytes += bytes +} + +// statTime always save a UTC time +type statTime struct { + Hour int `json:"hour"` + Day int `json:"day"` + Month int `json:"month"` + Year int `json:"year"` +} + +func makeStatTime(t time.Time) (st statTime) { + t = t.UTC() + st.Hour = t.Hour() + y, m, d := t.Date() + st.Day = d - 1 + st.Month = (int)(m) - 1 + st.Year = y + return +} + +func (t statTime) IsLastDay() bool { + return time.Date(t.Year, (time.Month)(t.Month+1), t.Day+1+1, 0, 0, 0, 0, time.UTC).Day() == 1 +} + +type statHistoryData struct { + Hours [24]statInstData `json:"hours"` + Days [31]statInstData `json:"days"` + Months [12]statInstData `json:"months"` +} + +type statData struct { + Date statTime `json:"date"` + statHistoryData + Prev statHistoryData `json:"prev"` + Years map[string]statInstData `json:"years"` +} + +func (d *statData) update(hits int32, bytes int64) { + now := makeStatTime(time.Now()) + if d.Date.Year != 0 { + switch { + case d.Date.Year != now.Year: + iscont := now.Year == d.Date.Year+1 + isMonthCont := iscont && now.Month == 0 && d.Date.Month+1 == len(d.Months) + var inst statInstData + for i := 0; i < d.Date.Month; i++ { + n := d.Months[i] + inst.update(n.Hits, n.Bytes) + } + if iscont { + for i := 0; i <= d.Date.Day; i++ { + n := d.Days[i] + inst.update(n.Hits, n.Bytes) + } + if isMonthCont { + for i := 0; i <= d.Date.Hour; i++ { + n := d.Hours[i] + inst.update(n.Hits, n.Bytes) + } + } + } + if d.Years == nil { + d.Years = make(map[string]statInstData, 2) + } + d.Years[strconv.Itoa(d.Date.Year)] = inst + // update history data + if iscont { + if isMonthCont { + if now.Day == 0 && d.Date.IsLastDay() { + d.Prev.Hours = d.Hours + for i := d.Date.Hour + 1; i < len(d.Hours); i++ { + d.Prev.Hours[i] = statInstData{} + } + } else { + d.Prev.Hours = [len(d.Hours)]statInstData{} + } + d.Prev.Days = d.Days + for i := d.Date.Day + 1; i < len(d.Days); i++ { + d.Prev.Days[i] = statInstData{} + } + } else { + d.Prev.Days = [len(d.Days)]statInstData{} + } + d.Prev.Months = d.Months + for i := d.Date.Month + 1; i < len(d.Months); i++ { + d.Prev.Months[i] = statInstData{} + } + } else { + d.Prev.Months = [len(d.Months)]statInstData{} + } + d.Months = [len(d.Months)]statInstData{} + case d.Date.Month != now.Month: + iscont := now.Month == d.Date.Month+1 + var inst statInstData + for i := 0; i < d.Date.Day; i++ { + n := d.Days[i] + inst.update(n.Hits, n.Bytes) + } + if iscont { + for i := 0; i <= d.Date.Hour; i++ { + n := d.Hours[i] + inst.update(n.Hits, n.Bytes) + } + } + d.Months[d.Date.Month] = inst + // clean up + for i := d.Date.Month + 1; i < now.Month; i++ { + d.Months[i] = statInstData{} + } + // update history data + if iscont { + if now.Day == 0 && d.Date.IsLastDay() { + d.Prev.Hours = d.Hours + for i := d.Date.Hour + 1; i < len(d.Hours); i++ { + d.Prev.Hours[i] = statInstData{} + } + } else { + d.Prev.Hours = [len(d.Hours)]statInstData{} + } + d.Prev.Days = d.Days + for i := d.Date.Day + 1; i < len(d.Days); i++ { + d.Prev.Days[i] = statInstData{} + } + } else { + d.Prev.Days = [len(d.Days)]statInstData{} + } + d.Days = [len(d.Days)]statInstData{} + case d.Date.Day != now.Day: + var inst statInstData + for i := 0; i <= d.Date.Hour; i++ { + n := d.Hours[i] + inst.update(n.Hits, n.Bytes) + } + d.Days[d.Date.Day] = inst + // clean up + for i := d.Date.Day + 1; i < now.Day; i++ { + d.Days[i] = statInstData{} + } + // update history data + if now.Day == d.Date.Day+1 { + d.Prev.Hours = d.Hours + for i := d.Date.Hour + 1; i < len(d.Hours); i++ { + d.Prev.Hours[i] = statInstData{} + } + } else { + d.Prev.Hours = [24]statInstData{} + } + d.Hours = [24]statInstData{} + case d.Date.Hour != now.Hour: + // clean up + for i := d.Date.Hour + 1; i < now.Hour; i++ { + d.Hours[i] = statInstData{} + } + } + } + + d.Hours[now.Hour].update(hits, bytes) + d.Date = now +} + +type Stats struct { + mux sync.RWMutex + statData +} + +const statsFileName = "stat.json" + +func (s *Stats) Load(dir string) (err error) { + s.mux.Lock() + defer s.mux.Unlock() + + if err = parseFileOrOld(filepath.Join(dir, statsFileName), func(buf []byte) error { + return json.Unmarshal(buf, &s.statData) + }); err != nil { + return + } + return +} + +// Save +func (s *Stats) Save(dir string) (err error) { + s.mux.Lock() + defer s.mux.Unlock() + + buf, err := json.Marshal(&s.statData) + if err != nil { + return + } + + if err = writeFileWithOld(filepath.Join(dir, statsFileName), buf, 0644); err != nil { + return + } + return +} + +func (s *Stats) AddHits(hits int32, bytes int64) { + s.mux.Lock() + defer s.mux.Unlock() + + s.update(hits, bytes) +} + +func parseFileOrOld(path string, parser func(buf []byte) error) (err error) { + oldpath := path + ".old" + buf, err := os.ReadFile(path) + if err == nil { + if err = parser(buf); err == nil { + return + } + } + buf, er := os.ReadFile(oldpath) + if er == nil { + if er = parser(buf); er == nil { + return + } + } + if errors.Is(err, os.ErrNotExist) { + if errors.Is(er, os.ErrNotExist) { + return nil + } + err = er + } + return +} + +func writeFileWithOld(path string, buf []byte, mode os.FileMode) (err error) { + oldpath := path + ".old" + if err = os.Remove(oldpath); err != nil && !errors.Is(err, os.ErrNotExist) { + return + } + if err = os.Rename(path, oldpath); err != nil { + return + } + if err = os.WriteFile(path, buf, mode); err != nil { + return + } + return +}