From 3b481c4cdbaeb739b3665011b80edd19f78966fd Mon Sep 17 00:00:00 2001 From: Akanksha Jain Date: Thu, 26 Dec 2024 16:53:17 +0530 Subject: [PATCH 1/2] Refactored code --- xload/adapter/http_loader.go | 56 +++++ xload/contract/xcomponent.go | 110 ++++++++++ xload/core/block.go | 96 +++++++++ xload/core/blockpool.go | 116 ++++++++++ xload/core/model/mode.go | 89 ++++++++ xload/core/threadpool.go | 107 ++++++++++ xload/core/workItem.go | 56 +++++ xload/implementation/data_manager.go | 133 ++++++++++++ xload/implementation/lister.go | 199 +++++++++++++++++ xload/implementation/splitter.go | 201 ++++++++++++++++++ xload/utils/helper.go | 34 +++ xload/xload.go | 306 +++++++++++++++++++++++++++ 12 files changed, 1503 insertions(+) create mode 100644 xload/adapter/http_loader.go create mode 100644 xload/contract/xcomponent.go create mode 100644 xload/core/block.go create mode 100644 xload/core/blockpool.go create mode 100644 xload/core/model/mode.go create mode 100644 xload/core/threadpool.go create mode 100644 xload/core/workItem.go create mode 100644 xload/implementation/data_manager.go create mode 100644 xload/implementation/lister.go create mode 100644 xload/implementation/splitter.go create mode 100644 xload/utils/helper.go create mode 100644 xload/xload.go diff --git a/xload/adapter/http_loader.go b/xload/adapter/http_loader.go new file mode 100644 index 000000000..cc885079e --- /dev/null +++ b/xload/adapter/http_loader.go @@ -0,0 +1,56 @@ +/* + _____ _____ _____ ____ ______ _____ ------ + | | | | | | | | | | | | | + | | | | | | | | | | | | | + | --- | | | | |-----| |---- | | |-----| |----- ------ + | | | | | | | | | | | | | + | ____| |_____ | ____| | ____| | |_____| _____| |_____ |_____ + + + Licensed under the MIT License . + + Copyright © 2020-2024 Microsoft Corporation. All rights reserved. + Author : + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE +*/ + +package adapter + +import ( + "xload/model" + "xload/utils" +) + +type HTTPLoader struct { + // Dependencies like HTTP client +} + +func NewHTTPLoader() *HTTPLoader { + return &HTTPLoader{} +} + +func (h *HTTPLoader) Load(data model.LoadSpec) error { + // Implement HTTP loading logic + err := utils.FetchData(data.URL) + if err != nil { + return err + } + return nil +} diff --git a/xload/contract/xcomponent.go b/xload/contract/xcomponent.go new file mode 100644 index 000000000..42fc1e7e9 --- /dev/null +++ b/xload/contract/xcomponent.go @@ -0,0 +1,110 @@ +/* + _____ _____ _____ ____ ______ _____ ------ + | | | | | | | | | | | | | + | | | | | | | | | | | | | + | --- | | | | |-----| |---- | | |-----| |----- ------ + | | | | | | | | | | | | | + | ____| |_____ | ____| | ____| | |_____| _____| |_____ |_____ + + + Licensed under the MIT License . + + Copyright © 2020-2024 Microsoft Corporation. All rights reserved. + Author : + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE +*/ + +package contract + +import ( + "github.com/Azure/azure-storage-fuse/v2/internal" + "github.com/Azure/azure-storage-fuse/v2/xload/core" +) + +type Xcomponent interface { + Init() + Start() + Stop() + Process(item *core.WorkItem) (int, error) + GetNext() Xcomponent + SetNext(s Xcomponent) + GetThreadPool() *core.ThreadPool + GetRemote() internal.Component + GetName() string + SetName(s string) + SetRemote(remote internal.Component) + SetThreadPool(pool *core.ThreadPool) +} + +type Xbase struct { + name string + pool *core.ThreadPool + remote internal.Component + next Xcomponent +} + +// SetThreadPool implements Xcomponent. +func (xb *Xbase) SetThreadPool(pool *core.ThreadPool) { + xb.pool = pool +} + +// SetRemote implements Xcomponent. +func (xb *Xbase) SetRemote(remote internal.Component) { + xb.remote = remote +} + +var _ Xcomponent = &Xbase{} + +func (xb *Xbase) Init() { +} + +func (xb *Xbase) Start() { +} + +func (xb *Xbase) Stop() { +} + +func (xb *Xbase) Process(item *core.WorkItem) (int, error) { + return 0, nil +} + +func (xb *Xbase) GetNext() Xcomponent { + return xb.next +} + +func (xb *Xbase) SetNext(s Xcomponent) { + xb.next = s +} + +func (xb *Xbase) GetThreadPool() *core.ThreadPool { + return xb.pool +} + +func (xb *Xbase) GetRemote() internal.Component { + return xb.remote +} + +func (xb *Xbase) GetName() string { + return xb.name +} + +func (xb *Xbase) SetName(s string) { + xb.name = s +} diff --git a/xload/core/block.go b/xload/core/block.go new file mode 100644 index 000000000..641f5f329 --- /dev/null +++ b/xload/core/block.go @@ -0,0 +1,96 @@ +/* + _____ _____ _____ ____ ______ _____ ------ + | | | | | | | | | | | | | + | | | | | | | | | | | | | + | --- | | | | |-----| |---- | | |-----| |----- ------ + | | | | | | | | | | | | | + | ____| |_____ | ____| | ____| | |_____| _____| |_____ |_____ + + + Licensed under the MIT License . + + Copyright © 2020-2024 Microsoft Corporation. All rights reserved. + Author : + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE +*/ + +package core + +import ( + "fmt" + "syscall" +) + +// Block is a memory mapped buffer with its state to hold data +type Block struct { + Index int // Index of the block in the pool + Offset int64 // Start offset of the data this block holds + Length int64 // Length of data that this block holds + Id string // ID to represent this block in the blob + Data []byte // Data this block holds +} + +// AllocateBlock creates a new memory mapped buffer for the given size +func AllocateBlock(size uint64) (*Block, error) { + if size == 0 { + return nil, fmt.Errorf("invalid size") + } + + prot, flags := syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_ANON|syscall.MAP_PRIVATE + addr, err := syscall.Mmap(-1, 0, int(size), prot, flags) + + if err != nil { + return nil, fmt.Errorf("mmap error: %v", err) + } + + block := &Block{ + Index: 0, + Data: addr, + Offset: 0, + Length: 0, + Id: "", + } + + return block, nil +} + +// Delete cleans up the memory mapped buffer +func (b *Block) Delete() error { + if b.Data == nil { + return fmt.Errorf("invalid buffer") + } + + err := syscall.Munmap(b.Data) + b.Data = nil + if err != nil { + // if we get here, there is likely memory corruption. + return fmt.Errorf("munmap error: %v", err) + } + + return nil +} + +// Clear the old data of this block +func (b *Block) ReUse() { + b.Id = "" + b.Index = 0 + b.Offset = 0 + b.Length = 0 +} diff --git a/xload/core/blockpool.go b/xload/core/blockpool.go new file mode 100644 index 000000000..c51b6aab0 --- /dev/null +++ b/xload/core/blockpool.go @@ -0,0 +1,116 @@ +/* + _____ _____ _____ ____ ______ _____ ------ + | | | | | | | | | | | | | + | | | | | | | | | | | | | + | --- | | | | |-----| |---- | | |-----| |----- ------ + | | | | | | | | | | | | | + | ____| |_____ | ____| | ____| | |_____| _____| |_____ |_____ + + + Licensed under the MIT License . + + Copyright © 2020-2024 Microsoft Corporation. All rights reserved. + Author : + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE +*/ + +package core + +import "github.com/Azure/azure-storage-fuse/v2/common/log" + +// BlockPool is a pool of Blocks +type BlockPool struct { + // Channel holding free blocks + blocksCh chan *Block + + // Size of each block this pool holds + blockSize uint64 + + // Number of block that this pool can handle at max + maxBlocks uint32 +} + +// NewBlockPool allocates a new pool of blocks +func NewBlockPool(blockSize uint64, blockCount uint32) *BlockPool { + // Ignore if config is invalid + if blockSize == 0 { + log.Err("blockpool::NewBlockPool : blockSize : %v", blockSize) + return nil + } + + pool := &BlockPool{ + blocksCh: make(chan *Block, blockCount), + maxBlocks: uint32(blockCount), + blockSize: blockSize, + } + + // Preallocate all blocks so that during runtime we do not spend CPU cycles on this + for i := (uint32)(0); i < blockCount; i++ { + b, err := AllocateBlock(blockSize) + if err != nil { + return nil + } + + pool.blocksCh <- b + } + + return pool +} + +// Terminate ends the block pool life +func (pool *BlockPool) Terminate() { + close(pool.blocksCh) + + // Release back the memory allocated to each block + for { + b := <-pool.blocksCh + if b == nil { + break + } + _ = b.Delete() + } +} + +// Usage provides % usage of this block pool +func (pool *BlockPool) Usage() uint32 { + return ((pool.maxBlocks - (uint32)(len(pool.blocksCh))) * 100) / pool.maxBlocks +} + +// Get a Block from the pool, return back if nothing is available +func (pool *BlockPool) Get() *Block { + // getting a block from pool will be a blocking operation if the pool is empty + b := <-pool.blocksCh + + // Mark the buffer ready for reuse now + if b != nil { + b.ReUse() + } + return b +} + +// Release back the Block to the pool +func (pool *BlockPool) Release(b *Block) { + select { + case pool.blocksCh <- b: + break + default: + _ = b.Delete() + } +} diff --git a/xload/core/model/mode.go b/xload/core/model/mode.go new file mode 100644 index 000000000..4e7ca61c7 --- /dev/null +++ b/xload/core/model/mode.go @@ -0,0 +1,89 @@ +/* + _____ _____ _____ ____ ______ _____ ------ + | | | | | | | | | | | | | + | | | | | | | | | | | | | + | --- | | | | |-----| |---- | | |-----| |----- ------ + | | | | | | | | | | | | | + | ____| |_____ | ____| | ____| | |_____| _____| |_____ |_____ + + + Licensed under the MIT License . + + Copyright © 2020-2024 Microsoft Corporation. All rights reserved. + Author : + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE +*/ + +package model + +import ( + "reflect" + + "github.com/JeffreyRichter/enum/enum" +) + +// One workitem to be processed +// type workItem struct { +// compName string // Name of the component +// path string // Name of the file being processed +// dataLen uint64 // Length of the data to be processed +// block *Block // Block to hold data for +// fileHandle *os.File // File handle to the file being processed +// err error // Error if any +// responseChannel chan *workItem // Channel to send the response +// download bool // boolean variable to decide upload or download +// } + +// xload mode enum +type Mode int + +var EMode = Mode(0).INVALID_MODE() + +func (Mode) INVALID_MODE() Mode { + return Mode(0) +} + +func (Mode) CHECKPOINT() Mode { + return Mode(1) +} + +func (Mode) DOWNLOAD() Mode { + return Mode(2) +} + +func (Mode) UPLOAD() Mode { + return Mode(3) +} + +func (Mode) SYNC() Mode { + return Mode(4) +} + +func (m Mode) String() string { + return enum.StringInt(m, reflect.TypeOf(m)) +} + +func (m *Mode) Parse(s string) error { + enumVal, err := enum.ParseInt(reflect.TypeOf(m), s, true, false) + if enumVal != nil { + *m = enumVal.(Mode) + } + return err +} diff --git a/xload/core/threadpool.go b/xload/core/threadpool.go new file mode 100644 index 000000000..c58cc7cd1 --- /dev/null +++ b/xload/core/threadpool.go @@ -0,0 +1,107 @@ +/* + _____ _____ _____ ____ ______ _____ ------ + | | | | | | | | | | | | | + | | | | | | | | | | | | | + | --- | | | | |-----| |---- | | |-----| |----- ------ + | | | | | | | | | | | | | + | ____| |_____ | ____| | ____| | |_____| _____| |_____ |_____ + + + Licensed under the MIT License . + + Copyright © 2020-2024 Microsoft Corporation. All rights reserved. + Author : + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE +*/ + +package core + +import ( + "sync" + + "github.com/Azure/azure-storage-fuse/v2/common/log" +) + +// ThreadPool is a group of workers that can be used to execute a task +type ThreadPool struct { + // Number of workers running in this group + worker uint32 + + // Wait group to wait for all workers to finish + wg sync.WaitGroup + + // Channel to hold pending requests + WorkItems chan *WorkItem + + // Reader method that will actually read the data + callback func(*WorkItem) (int, error) +} + +// newThreadPool creates a new thread pool +func NewThreadPool(count uint32, callback func(*WorkItem) (int, error)) *ThreadPool { + if count == 0 || callback == nil { + return nil + } + + return &ThreadPool{ + worker: count, + callback: callback, + WorkItems: make(chan *WorkItem, count*2), + } +} + +// Start all the workers and wait till they start receiving requests +func (t *ThreadPool) Start() { + for i := uint32(0); i < t.worker; i++ { + t.wg.Add(1) + go t.Do() + } +} + +// Stop all the workers threads +func (t *ThreadPool) Stop() { + close(t.WorkItems) + t.wg.Wait() +} + +// Schedule the download of a block +func (t *ThreadPool) Schedule(item *WorkItem) { + t.WorkItems <- item +} + +// Do is the core task to be executed by each worker thread +func (t *ThreadPool) Do() { + defer t.wg.Done() + + // This thread will work only on both high and low priority channel + for item := range t.WorkItems { + _, err := t.callback(item) + if err != nil { + // TODO:: xload : add retry logic + log.Err("ThreadPool::Do : Error in %s processing workitem %s : %v", item.CompName, item.Path, err) + } + + // add this error in response channel + if cap(item.ResponseChannel) > 0 { + item.Err = err + item.ResponseChannel <- item + } + } +} diff --git a/xload/core/workItem.go b/xload/core/workItem.go new file mode 100644 index 000000000..efa892a67 --- /dev/null +++ b/xload/core/workItem.go @@ -0,0 +1,56 @@ +/* + _____ _____ _____ ____ ______ _____ ------ + | | | | | | | | | | | | | + | | | | | | | | | | | | | + | --- | | | | |-----| |---- | | |-----| |----- ------ + | | | | | | | | | | | | | + | ____| |_____ | ____| | ____| | |_____| _____| |_____ |_____ + + + Licensed under the MIT License . + + Copyright © 2020-2024 Microsoft Corporation. All rights reserved. + Author : + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE +*/ + +package core + +import ( + "os" +) + +// One workitem to be processed +type WorkItem struct { + CompName string // Name of the component + Path string // Name of the file being processed + DataLen uint64 // Length of the data to be processed + Block *Block // Block to hold data for + FileHandle *os.File // File handle to the file being processed + Err error // Error if any + ResponseChannel chan *WorkItem // Channel to send the response + Download bool // boolean variable to decide upload or download +} + +func (item *WorkItem) IsDownloading() bool { + + return item.Download + +} diff --git a/xload/implementation/data_manager.go b/xload/implementation/data_manager.go new file mode 100644 index 000000000..422a37a2d --- /dev/null +++ b/xload/implementation/data_manager.go @@ -0,0 +1,133 @@ +/* + _____ _____ _____ ____ ______ _____ ------ + | | | | | | | | | | | | | + | | | | | | | | | | | | | + | --- | | | | |-----| |---- | | |-----| |----- ------ + | | | | | | | | | | | | | + | ____| |_____ | ____| | ____| | |_____| _____| |_____ |_____ + + + Licensed under the MIT License . + + Copyright © 2020-2024 Microsoft Corporation. All rights reserved. + Author : + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE +*/ + +package implementation + +import ( + "github.com/Azure/azure-storage-fuse/v2/common/log" + "github.com/Azure/azure-storage-fuse/v2/internal" + "github.com/Azure/azure-storage-fuse/v2/internal/handlemap" + "github.com/Azure/azure-storage-fuse/v2/xload/contract" + "github.com/Azure/azure-storage-fuse/v2/xload/core" +) + +// verify that the below types implement the contaract.Xcomponent interfaces +var _ contract.Xcomponent = &dataManager{} +var _ contract.Xcomponent = &remoteDataManager{} + +const ( + compName = "xload" + MAX_WORKER_COUNT = 64 + MAX_DATA_SPLITTER = 16 + MAX_LISTER = 16 + defaultBlockSize = 16 + DATA_MANAGER string = "DATA_MANAGER" +) + +type dataManager struct { + contract.Xbase +} + +// -------------------------------------------------------------------------------------------------------- + +type remoteDataManager struct { + dataManager +} + +func NewRemoteDataManager(remote internal.Component) (*remoteDataManager, error) { + log.Debug("data_manager::newRemoteDataManager : create new remote data manager") + + remoteDM := &remoteDataManager{ + dataManager: dataManager{ + + Xbase: contract.Xbase{}, + }, + } + + remoteDM.SetRemote(remote) + remoteDM.SetName(DATA_MANAGER) + remoteDM.init() + return remoteDM, nil +} + +func (rdm *remoteDataManager) init() { + rdm.SetThreadPool(core.NewThreadPool(MAX_WORKER_COUNT, rdm.process)) + if rdm.GetThreadPool() == nil { + log.Err("remoteDataManager::init : fail to init thread pool") + } +} + +func (rdm *remoteDataManager) start() { + log.Debug("remoteDataManager::start : start remote data manager") + rdm.GetThreadPool().Start() +} + +func (rdm *remoteDataManager) stop() { + log.Debug("remoteDataManager::stop : stop remote data manager") + if rdm.GetThreadPool() != nil { + rdm.GetThreadPool().Stop() + } +} + +// upload or download block +func (rdm *remoteDataManager) process(item *core.WorkItem) (int, error) { + if item.IsDownloading() { + return rdm.ReadData(item) + } else { + return rdm.WriteData(item) + } +} + +// ReadData reads data from the data manager +func (rdm *remoteDataManager) ReadData(item *core.WorkItem) (int, error) { + log.Debug("remoteDataManager::ReadData : Scheduling dow`nload for %s offset %v", item.Path, item.Block.Offset) + + h := handlemap.NewHandle(item.Path) + h.Size = int64(item.DataLen) + return rdm.GetRemote().ReadInBuffer(internal.ReadInBufferOptions{ + Handle: h, + Offset: item.Block.Offset, + Data: item.Block.Data, + }) +} + +// WriteData writes data to the data manager +func (rdm *remoteDataManager) WriteData(item *core.WorkItem) (int, error) { + log.Debug("remoteDataManager::WriteData : Scheduling upload for %s offset %v", item.Path, item.Block.Offset) + + return int(item.Block.Length), rdm.GetRemote().StageData(internal.StageDataOptions{ + Name: item.Path, + Data: item.Block.Data[0:item.Block.Length], + // Offset: uint64(item.block.offset), + Id: item.Block.Id}) +} diff --git a/xload/implementation/lister.go b/xload/implementation/lister.go new file mode 100644 index 000000000..e5c705208 --- /dev/null +++ b/xload/implementation/lister.go @@ -0,0 +1,199 @@ +/* + _____ _____ _____ ____ ______ _____ ------ + | | | | | | | | | | | | | + | | | | | | | | | | | | | + | --- | | | | |-----| |---- | | |-----| |----- ------ + | | | | | | | | | | | | | + | ____| |_____ | ____| | ____| | |_____| _____| |_____ |_____ + + + Licensed under the MIT License . + + Copyright © 2020-2024 Microsoft Corporation. All rights reserved. + Author : + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE +*/ + +package implementation + +import ( + "os" + "path/filepath" + "time" + + "github.com/Azure/azure-storage-fuse/v2/common/config" + "github.com/Azure/azure-storage-fuse/v2/common/log" + "github.com/Azure/azure-storage-fuse/v2/internal" + "github.com/Azure/azure-storage-fuse/v2/xload/contract" + "github.com/Azure/azure-storage-fuse/v2/xload/core" +) + +// verify that the below types implement the contract.Xcomponent interfaces +var _ contract.Xcomponent = &lister{} +var _ contract.Xcomponent = &remoteLister{} + +// verify that the below types implement the xenumerator interfaces +var _ enumerator = &remoteLister{} + +const LISTER string = "lister" + +type lister struct { + contract.Xbase + path string // base path of the directory to be listed +} + +type enumerator interface { + mkdir(name string) error +} + +// -------------------------------------------------------------------------------------------------------- + +type remoteLister struct { + lister + listBlocked bool +} + +func NewRemoteLister(path string, remote internal.Component) (*remoteLister, error) { + log.Debug("lister::newRemoteLister : create new remote lister for %s", path) + + rl := &remoteLister{ + lister: lister{ + path: path, + }, + listBlocked: false, + } + rl.SetRemote(remote) + rl.SetName(LISTER) + rl.init() + return rl, nil + +} + +func (rl *remoteLister) init() { + rl.SetThreadPool(core.NewThreadPool(MAX_LISTER, rl.process)) + if rl.GetThreadPool() == nil { + log.Err("remoteLister::init : fail to init thread pool") + } +} + +func (rl *remoteLister) start() { + log.Debug("remoteLister::start : start remote lister for %s", rl.path) + rl.GetThreadPool().Start() + rl.GetThreadPool().Schedule(&core.WorkItem{CompName: rl.GetName()}) +} + +func (rl *remoteLister) stop() { + log.Debug("remoteLister::stop : stop remote lister for %s", rl.path) + if rl.GetRemote() != nil { + rl.GetThreadPool().Stop() + } + rl.GetNext().Start() +} + +// wait for the configured block-list-on-mount-sec to make the list call +func waitForListTimeout() error { + var blockListSeconds uint16 = 0 + err := config.UnmarshalKey("azstorage.block-list-on-mount-sec", &blockListSeconds) + if err != nil { + return err + } + time.Sleep(time.Duration(blockListSeconds) * time.Second) + return nil +} + +func (rl *remoteLister) process(item *core.WorkItem) (int, error) { + absPath := item.Path // TODO:: xload : check this for subdirectory mounting + + log.Debug("remoteLister::process : Reading remote dir %s", absPath) + + // this block will be executed only in the first list call for the remote directory + // so haven't made the listBlocked variable atomic + if !rl.listBlocked { + log.Debug("remoteLister::process : Waiting for block-list-on-mount-sec before making the list call") + err := waitForListTimeout() + if err != nil { + log.Err("remoteLister::process : unable to unmarshal block-list-on-mount-sec [%s]", err.Error()) + return 0, err + } + rl.listBlocked = true + } + + marker := "" + var cnt, iteration int + for { + entries, new_marker, err := rl.GetRemote().StreamDir(internal.StreamDirOptions{ + Name: absPath, + Token: marker, + }) + if err != nil { + log.Err("remoteLister::process : Remote listing failed for %s [%s]", absPath, err.Error()) + } + + marker = new_marker + cnt += len(entries) + iteration++ + log.Debug("remoteLister::process : count: %d , iterations: %d", cnt, iteration) + + for _, entry := range entries { + log.Debug("remoteLister::process : Iterating: %s, Is directory: %v", entry.Path, entry.IsDir()) + + if entry.IsDir() { + // create directory in local + // spawn go routine for directory creation and then + // adding to the input channel of the listing component + // TODO:: xload : check how many threads can we spawn + go func(name string) { + localPath := filepath.Join(rl.path, name) + err = rl.mkdir(localPath) + // TODO:: xload : handle error + if err != nil { + log.Err("remoteLister::process : Failed to create directory [%s]", err.Error()) + return + } + + // push the directory to input pool for its listing + rl.GetThreadPool().Schedule(&core.WorkItem{ + CompName: rl.GetName(), + Path: name, + }) + }(entry.Path) + } else { + // send file to the output channel for chunking + rl.GetNext().GetThreadPool().Schedule(&core.WorkItem{ + CompName: rl.GetNext().GetName(), + Path: entry.Path, + DataLen: uint64(entry.Size), + }) + } + } + + if len(new_marker) == 0 { + log.Debug("remoteLister::process : remote listing done for %s", absPath) + break + } + } + + return cnt, nil +} + +func (rl *remoteLister) mkdir(name string) error { + log.Debug("remoteLister::mkdir : Creating local path: %s", name) + return os.MkdirAll(name, 0777) +} diff --git a/xload/implementation/splitter.go b/xload/implementation/splitter.go new file mode 100644 index 000000000..cde118e7c --- /dev/null +++ b/xload/implementation/splitter.go @@ -0,0 +1,201 @@ +/* + _____ _____ _____ ____ ______ _____ ------ + | | | | | | | | | | | | | + | | | | | | | | | | | | | + | --- | | | | |-----| |---- | | |-----| |----- ------ + | | | | | | | | | | | | | + | ____| |_____ | ____| | ____| | |_____| _____| |_____ |_____ + + + Licensed under the MIT License . + + Copyright © 2020-2024 Microsoft Corporation. All rights reserved. + Author : + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE +*/ + +package implementation + +import ( + "fmt" + "os" + "path/filepath" + "sync" + + "github.com/Azure/azure-storage-fuse/v2/common/log" + "github.com/Azure/azure-storage-fuse/v2/internal" + "github.com/Azure/azure-storage-fuse/v2/xload/contract" + "github.com/Azure/azure-storage-fuse/v2/xload/core" +) + +// verify that the below types implement the contract.Xcomponent interfaces +var _ contract.Xcomponent = &splitter{} +var _ contract.Xcomponent = &downloadSplitter{} + +const SPLITTER string = "splitter" + +type splitter struct { + contract.Xbase + blockSize uint64 + blockPool *core.BlockPool + path string +} + +// -------------------------------------------------------------------------------------------------------- + +type downloadSplitter struct { + splitter +} + +func NewDownloadSplitter(blockSize uint64, blockPool *core.BlockPool, path string, remote internal.Component) (*downloadSplitter, error) { + log.Debug("splitter::newDownloadSplitter : create new download splitter for %s, block size %v", path, blockSize) + + d := &downloadSplitter{ + splitter: splitter{ + blockSize: blockSize, + blockPool: blockPool, + path: path, + }, + } + d.SetRemote(remote) + d.SetName(SPLITTER) + d.init() + return d, nil +} + +func (d *downloadSplitter) init() { + d.SetThreadPool(core.NewThreadPool(MAX_DATA_SPLITTER, d.process)) + if d.GetThreadPool() == nil { + log.Err("downloadSplitter::init : fail to init thread pool") + } +} + +func (d *downloadSplitter) start() { + log.Debug("downloadSplitter::start : start download splitter for %s", d.path) + d.GetThreadPool().Start() +} + +func (d *downloadSplitter) stop() { + log.Debug("downloadSplitter::stop : stop download splitter for %s", d.path) + if d.GetThreadPool() != nil { + d.GetThreadPool().Stop() + } + d.GetNext().Stop() +} + +// download data in chunks and then write to the local file +func (d *downloadSplitter) process(item *core.WorkItem) (int, error) { + var err error + + log.Debug("downloadSplitter::process : Splitting data for %s", item.Path) + if len(item.Path) == 0 { + return 0, nil + } + + numBlocks := ((item.DataLen - 1) / d.blockSize) + 1 + offset := int64(0) + + // TODO:: xload : should we delete the file if it already exists + // TODO:: xload : what should be the flags and mode and should we allocate the full size to the file + item.FileHandle, err = os.OpenFile(filepath.Join(d.path, item.Path), os.O_WRONLY|os.O_CREATE, 0644) + if err != nil { + // create file + return -1, fmt.Errorf("failed to open file %s [%v]", item.Path, err) + } + + defer item.FileHandle.Close() + + wg := sync.WaitGroup{} + wg.Add(1) + + responseChannel := make(chan *core.WorkItem, numBlocks) + + operationSuccess := true + go func() { + defer wg.Done() + + for i := 0; i < int(numBlocks); i++ { + respSplitItem := <-responseChannel + if respSplitItem.Err != nil { + log.Err("downloadSplitter::process : Failed to download data for file %s", item.Path) + operationSuccess = false + } else { + _, err := item.FileHandle.WriteAt(respSplitItem.Block.Data, respSplitItem.Block.Offset) + if err != nil { + log.Err("downloadSplitter::process : Failed to write data to file %s", item.Path) + operationSuccess = false + } + } + + if respSplitItem.Block != nil { + log.Debug("downloadSplitter::process : Download successful %s index %d offset %v", item.Path, respSplitItem.Block.Index, respSplitItem.Block.Offset) + d.blockPool.Release(respSplitItem.Block) + } + } + }() + + for i := 0; i < int(numBlocks); i++ { + block := d.blockPool.Get() + if block == nil { + responseChannel <- &core.WorkItem{Err: fmt.Errorf("failed to get block from pool for file %s, offset %v", item.Path, offset)} + } else { + block.Index = i + block.Offset = offset + block.Length = int64(d.blockSize) + + splitItem := &core.WorkItem{ + CompName: d.GetNext().GetName(), + Path: item.Path, + DataLen: item.DataLen, + FileHandle: item.FileHandle, + Block: block, + ResponseChannel: responseChannel, + Download: true, + } + log.Debug("downloadSplitter::process : Scheduling download for %s offset %v", item.Path, offset) + d.GetNext().GetThreadPool().Schedule(splitItem) + } + + offset += int64(d.blockSize) + } + + wg.Wait() + err = item.FileHandle.Truncate(int64(item.DataLen)) + if err != nil { + log.Err("downloadSplitter::process : Failed to truncate file %s [%s]", item.Path, err.Error()) + operationSuccess = false + } + + if !operationSuccess { + log.Err("downloadSplitter::process : Failed to download data for file %s", item.Path) + log.Debug("downloadSplitter::process : deleting file %s", item.Path) + + // delete the file which failed to download from the local path + err = os.Remove(filepath.Join(d.path, item.Path)) + if err != nil { + log.Err("downloadSplitter::process : Unable to delete file %s [%s]", item.Path, err.Error()) + } + + return -1, fmt.Errorf("failed to download data for file %s", item.Path) + } + + log.Debug("downloadSplitter::process : Download completed for file %s", item.Path) + return 0, nil +} diff --git a/xload/utils/helper.go b/xload/utils/helper.go new file mode 100644 index 000000000..bc746bff0 --- /dev/null +++ b/xload/utils/helper.go @@ -0,0 +1,34 @@ +/* + _____ _____ _____ ____ ______ _____ ------ + | | | | | | | | | | | | | + | | | | | | | | | | | | | + | --- | | | | |-----| |---- | | |-----| |----- ------ + | | | | | | | | | | | | | + | ____| |_____ | ____| | ____| | |_____| _____| |_____ |_____ + + + Licensed under the MIT License . + + Copyright © 2020-2024 Microsoft Corporation. All rights reserved. + Author : + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE +*/ + +package utils diff --git a/xload/xload.go b/xload/xload.go new file mode 100644 index 000000000..cd1da7185 --- /dev/null +++ b/xload/xload.go @@ -0,0 +1,306 @@ +/* + _____ _____ _____ ____ ______ _____ ------ + | | | | | | | | | | | | | + | | | | | | | | | | | | | + | --- | | | | |-----| |---- | | |-----| |----- ------ + | | | | | | | | | | | | | + | ____| |_____ | ____| | ____| | |_____| _____| |_____ |_____ + + + Licensed under the MIT License . + + Copyright © 2020-2024 Microsoft Corporation. All rights reserved. + Author : + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE +*/ + +package xload + +import ( + "context" + "fmt" + "os" + "strings" + + "github.com/Azure/azure-storage-fuse/v2/common" + "github.com/Azure/azure-storage-fuse/v2/common/config" + "github.com/Azure/azure-storage-fuse/v2/common/log" + "github.com/Azure/azure-storage-fuse/v2/internal" + "github.com/Azure/azure-storage-fuse/v2/xload/contract" + "github.com/Azure/azure-storage-fuse/v2/xload/core" + "github.com/Azure/azure-storage-fuse/v2/xload/core/model" + "github.com/Azure/azure-storage-fuse/v2/xload/implementation" +) + +const _1MB uint64 = (1024 * 1024) + +// Common structure for Component +type Xload struct { + internal.BaseComponent + blockSize uint64 // Size of each block to be cached + mode model.Mode // Mode of the Xload component + + workerCount uint32 // Number of workers running + blockPool *core.BlockPool // Pool of blocks + path string // Path on local disk where Xload will operate + comps []contract.Xcomponent +} + +// Structure defining your config parameters +type XloadOptions struct { + BlockSize float64 `config:"block-size-mb" yaml:"block-size-mb,omitempty"` + Mode string `config:"mode" yaml:"mode,omitempty"` + Path string `config:"path" yaml:"path,omitempty"` + // TODO:: xload : add parallelism parameter +} + +const ( + compName = "xload" + MAX_WORKER_COUNT = 64 + MAX_DATA_SPLITTER = 16 + MAX_LISTER = 16 + defaultBlockSize = 16 +) + +// Verification to check satisfaction criteria with Component Interface +var _ internal.Component = &Xload{} + +func (xl *Xload) Name() string { + return compName +} + +func (xl *Xload) SetName(name string) { + xl.BaseComponent.SetName(name) +} + +func (xl *Xload) SetNextComponent(nc internal.Component) { + xl.BaseComponent.SetNextComponent(nc) +} + +func (xl *Xload) Priority() internal.ComponentPriority { + return internal.EComponentPriority.LevelMid() +} + +// Configure : Pipeline will call this method after constructor so that you can read config and initialize yourself +func (xl *Xload) Configure(_ bool) error { + log.Trace("Xload::Configure : %s", xl.Name()) + + // xload component should be used only in readonly mode + var readonly bool + err := config.UnmarshalKey("read-only", &readonly) + if err != nil { + log.Err("Xload::Configure : config error [unable to obtain read-only]") + return fmt.Errorf("config error in %s [%s]", xl.Name(), err.Error()) + } + + if !readonly { + log.Err("Xload::Configure : Xload component should be used only in read-only mode") + return fmt.Errorf("Xload component should be used in only in read-only mode") + } + + conf := XloadOptions{} + err = config.UnmarshalKey(xl.Name(), &conf) + if err != nil { + log.Err("Xload::Configure : config error [invalid config attributes]") + return fmt.Errorf("Xload: config error [invalid config attributes]") + } + + xl.blockSize = uint64(defaultBlockSize) * _1MB // 16 MB as deafult block size + if config.IsSet(compName + ".block-size-mb") { + xl.blockSize = uint64(conf.BlockSize * float64(_1MB)) + } + + xl.path = common.ExpandPath(strings.TrimSpace(conf.Path)) + if xl.path == "" { + // TODO:: xload : should we use current working directory in this case + log.Err("Xload::Configure : config error [path not given in xload]") + return fmt.Errorf("config error in %s [path not given]", xl.Name()) + } else { + //check mnt path is not same as xload path + mntPath := "" + err = config.UnmarshalKey("mount-path", &mntPath) + if err != nil { + log.Err("Xload::Configure : config error [unable to obtain Mount Path [%s]]", err.Error()) + return fmt.Errorf("config error in %s [%s]", xl.Name(), err.Error()) + } + + if xl.path == mntPath { + log.Err("Xload::Configure : config error [xload path is same as mount path]") + return fmt.Errorf("config error in %s error [xload path is same as mount path]", xl.Name()) + } + + _, err = os.Stat(xl.path) + if os.IsNotExist(err) { + log.Info("Xload::Configure : config error [xload path does not exist, attempting to create path]") + err := os.Mkdir(xl.path, os.FileMode(0755)) + if err != nil { + log.Err("Xload::Configure : config error creating directory of xload path [%s]", err.Error()) + return fmt.Errorf("config error in %s [%s]", xl.Name(), err.Error()) + } + } + + if !common.IsDirectoryEmpty(xl.path) { + log.Err("Xload::Configure : config error %s directory is not empty", xl.path) + return fmt.Errorf("config error in %s [temp directory not empty]", xl.Name()) + } + } + + var mode model.Mode + err = mode.Parse(conf.Mode) + if err != nil { + log.Err("Xload::Configure : Failed to parse mode %s [%s]", conf.Mode, err.Error()) + return fmt.Errorf("invalid mode in xload : %s", conf.Mode) + } + + if mode == model.EMode.INVALID_MODE() { + log.Err("Xload::Configure : Invalid mode : %s", conf.Mode) + return fmt.Errorf("invalid mode in xload : %s", conf.Mode) + } + + xl.mode = mode + + return nil +} + +// Start : Pipeline calls this method to start the component functionality +func (xl *Xload) Start(ctx context.Context) error { + log.Trace("Xload::Start : Starting component %s", xl.Name()) + + xl.workerCount = MAX_WORKER_COUNT + xl.blockPool = core.NewBlockPool(xl.blockSize, xl.workerCount*3) + if xl.blockPool == nil { + log.Err("Xload::Start : Failed to create block pool") + return fmt.Errorf("failed to create block pool") + } + + var err error + + // Xload : start code goes here + switch xl.mode { + case model.EMode.CHECKPOINT(): + // Start checkpoint thread here + return fmt.Errorf("checkpoint is currently unsupported") + case model.EMode.DOWNLOAD(): + // Start downloader here + err = xl.startDownloader() + if err != nil { + log.Err("Xload::Start : Failed to start downloader [%s]", err.Error()) + return err + } + case model.EMode.UPLOAD(): + // Start uploader here + return fmt.Errorf("uploader is currently unsupported") + case model.EMode.SYNC(): + //Start syncer here + return fmt.Errorf("sync is currently unsupported") + default: + log.Err("Xload::Start : Invalid mode : %s", xl.mode.String()) + return fmt.Errorf("invalid mode in xload : %s", xl.mode.String()) + } + + return xl.startComponents() +} + +// Stop : Stop the component functionality and kill all threads started +func (xl *Xload) Stop() error { + log.Trace("Xload::Stop : Stopping component %s", xl.Name()) + + xl.comps[0].Stop() + xl.blockPool.Terminate() + + // TODO:: xload : should we delete the files from local path + err := common.TempCacheCleanup(xl.path) + if err != nil { + log.Err("unable to clean xload local path [%s]", err.Error()) + } + + return nil +} + +func (xl *Xload) startDownloader() error { + log.Trace("Xload::startDownloader : Starting downloader") + + // Create remote lister pool to list remote files + rl, err := implementation.NewRemoteLister(xl.path, xl.NextComponent()) + if err != nil { + log.Err("Xload::startDownloader : Unable to create remote lister [%s]", err.Error()) + return err + } + + ds, err := implementation.NewDownloadSplitter(xl.blockSize, xl.blockPool, xl.path, xl.NextComponent()) + if err != nil { + log.Err("Xload::startDownloader : Unable to create download splitter [%s]", err.Error()) + return err + } + + rdm, err := implementation.NewRemoteDataManager(xl.NextComponent()) + if err != nil { + log.Err("Xload::startUploader : failed to create remote data manager [%s]", err.Error()) + return err + } + + xl.comps = []contract.Xcomponent{rl, ds, rdm} + return nil +} + +func (xl *Xload) createChain() error { + if len(xl.comps) == 0 { + log.Err("Xload::createChain : no component initialized in xload") + return fmt.Errorf("no component initialized in xload") + } + + currComp := xl.comps[0] + + for i := 1; i < len(xl.comps); i++ { + nextComp := xl.comps[i] + currComp.SetNext(nextComp) + currComp = nextComp + } + + return nil +} + +func (xl *Xload) startComponents() error { + err := xl.createChain() + if err != nil { + log.Err("Xload::startComponents : failed to create chain [%s]", err.Error()) + return err + } + + for i := len(xl.comps) - 1; i >= 0; i-- { + xl.comps[i].Start() + } + + return nil +} + +// ------------------------- Factory ------------------------------------------- + +// Pipeline will call this method to create your object, initialize your variables here +func NewXloadComponent() internal.Component { + comp := &Xload{} + comp.SetName(compName) + return comp +} + +// On init register this component to pipeline and supply your constructor +func init() { + internal.AddComponent(compName, NewXloadComponent) +} From f83bad7361b9d0c9b28842f18e50c98fa91b38b1 Mon Sep 17 00:00:00 2001 From: Akanksha Jain Date: Thu, 26 Dec 2024 16:57:09 +0530 Subject: [PATCH 2/2] Removed unused files --- xload/adapter/http_loader.go | 56 ------------------------------------ 1 file changed, 56 deletions(-) delete mode 100644 xload/adapter/http_loader.go diff --git a/xload/adapter/http_loader.go b/xload/adapter/http_loader.go deleted file mode 100644 index cc885079e..000000000 --- a/xload/adapter/http_loader.go +++ /dev/null @@ -1,56 +0,0 @@ -/* - _____ _____ _____ ____ ______ _____ ------ - | | | | | | | | | | | | | - | | | | | | | | | | | | | - | --- | | | | |-----| |---- | | |-----| |----- ------ - | | | | | | | | | | | | | - | ____| |_____ | ____| | ____| | |_____| _____| |_____ |_____ - - - Licensed under the MIT License . - - Copyright © 2020-2024 Microsoft Corporation. All rights reserved. - Author : - - Permission is hereby granted, free of charge, to any person obtaining a copy - of this software and associated documentation files (the "Software"), to deal - in the Software without restriction, including without limitation the rights - to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - copies of the Software, and to permit persons to whom the Software is - furnished to do so, subject to the following conditions: - - The above copyright notice and this permission notice shall be included in all - copies or substantial portions of the Software. - - THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - SOFTWARE -*/ - -package adapter - -import ( - "xload/model" - "xload/utils" -) - -type HTTPLoader struct { - // Dependencies like HTTP client -} - -func NewHTTPLoader() *HTTPLoader { - return &HTTPLoader{} -} - -func (h *HTTPLoader) Load(data model.LoadSpec) error { - // Implement HTTP loading logic - err := utils.FetchData(data.URL) - if err != nil { - return err - } - return nil -}