Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored code #1599

Draft
wants to merge 2 commits into
base: sourav/prefetch
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions xload/contract/xcomponent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
_____ _____ _____ ____ ______ _____ ------
| | | | | | | | | | | | |
| | | | | | | | | | | | |
| --- | | | | |-----| |---- | | |-----| |----- ------
| | | | | | | | | | | | |
| ____| |_____ | ____| | ____| | |_____| _____| |_____ |_____


Licensed under the MIT License <http://opensource.org/licenses/MIT>.

Copyright © 2020-2024 Microsoft Corporation. All rights reserved.
Author : <[email protected]>

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE
*/

package contract

import (
"github.com/Azure/azure-storage-fuse/v2/internal"
"github.com/Azure/azure-storage-fuse/v2/xload/core"
)

type Xcomponent interface {
Init()
Start()
Stop()
Process(item *core.WorkItem) (int, error)
GetNext() Xcomponent
SetNext(s Xcomponent)
GetThreadPool() *core.ThreadPool
GetRemote() internal.Component
GetName() string
SetName(s string)
SetRemote(remote internal.Component)
SetThreadPool(pool *core.ThreadPool)
}

type Xbase struct {
name string
pool *core.ThreadPool
remote internal.Component
next Xcomponent
}

// SetThreadPool implements Xcomponent.
func (xb *Xbase) SetThreadPool(pool *core.ThreadPool) {
xb.pool = pool
}

// SetRemote implements Xcomponent.
func (xb *Xbase) SetRemote(remote internal.Component) {
xb.remote = remote
}

var _ Xcomponent = &Xbase{}

func (xb *Xbase) Init() {
}

func (xb *Xbase) Start() {
}

func (xb *Xbase) Stop() {
}

func (xb *Xbase) Process(item *core.WorkItem) (int, error) {
return 0, nil
}

func (xb *Xbase) GetNext() Xcomponent {
return xb.next
}

func (xb *Xbase) SetNext(s Xcomponent) {
xb.next = s
}

func (xb *Xbase) GetThreadPool() *core.ThreadPool {
return xb.pool
}

func (xb *Xbase) GetRemote() internal.Component {
return xb.remote
}

func (xb *Xbase) GetName() string {
return xb.name
}

func (xb *Xbase) SetName(s string) {
xb.name = s
}
96 changes: 96 additions & 0 deletions xload/core/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
_____ _____ _____ ____ ______ _____ ------
| | | | | | | | | | | | |
| | | | | | | | | | | | |
| --- | | | | |-----| |---- | | |-----| |----- ------
| | | | | | | | | | | | |
| ____| |_____ | ____| | ____| | |_____| _____| |_____ |_____


Licensed under the MIT License <http://opensource.org/licenses/MIT>.

Copyright © 2020-2024 Microsoft Corporation. All rights reserved.
Author : <[email protected]>

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE
*/

package core

import (
"fmt"
"syscall"
)

// Block is a memory mapped buffer with its state to hold data
type Block struct {
Index int // Index of the block in the pool
Offset int64 // Start offset of the data this block holds
Length int64 // Length of data that this block holds
Id string // ID to represent this block in the blob
Data []byte // Data this block holds
}

// AllocateBlock creates a new memory mapped buffer for the given size
func AllocateBlock(size uint64) (*Block, error) {
if size == 0 {
return nil, fmt.Errorf("invalid size")
}

prot, flags := syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_ANON|syscall.MAP_PRIVATE
addr, err := syscall.Mmap(-1, 0, int(size), prot, flags)

if err != nil {
return nil, fmt.Errorf("mmap error: %v", err)
}

block := &Block{
Index: 0,
Data: addr,
Offset: 0,
Length: 0,
Id: "",
}

return block, nil
}

// Delete cleans up the memory mapped buffer
func (b *Block) Delete() error {
if b.Data == nil {
return fmt.Errorf("invalid buffer")
}

err := syscall.Munmap(b.Data)
b.Data = nil
if err != nil {
// if we get here, there is likely memory corruption.
return fmt.Errorf("munmap error: %v", err)
}

return nil
}

// Clear the old data of this block
func (b *Block) ReUse() {
b.Id = ""
b.Index = 0
b.Offset = 0
b.Length = 0
}
116 changes: 116 additions & 0 deletions xload/core/blockpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
_____ _____ _____ ____ ______ _____ ------
| | | | | | | | | | | | |
| | | | | | | | | | | | |
| --- | | | | |-----| |---- | | |-----| |----- ------
| | | | | | | | | | | | |
| ____| |_____ | ____| | ____| | |_____| _____| |_____ |_____


Licensed under the MIT License <http://opensource.org/licenses/MIT>.

Copyright © 2020-2024 Microsoft Corporation. All rights reserved.
Author : <[email protected]>

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE
*/

package core

import "github.com/Azure/azure-storage-fuse/v2/common/log"

// BlockPool is a pool of Blocks
type BlockPool struct {
// Channel holding free blocks
blocksCh chan *Block

// Size of each block this pool holds
blockSize uint64

// Number of block that this pool can handle at max
maxBlocks uint32
}

// NewBlockPool allocates a new pool of blocks
func NewBlockPool(blockSize uint64, blockCount uint32) *BlockPool {
// Ignore if config is invalid
if blockSize == 0 {
log.Err("blockpool::NewBlockPool : blockSize : %v", blockSize)
return nil
}

pool := &BlockPool{
blocksCh: make(chan *Block, blockCount),
maxBlocks: uint32(blockCount),
blockSize: blockSize,
}

// Preallocate all blocks so that during runtime we do not spend CPU cycles on this
for i := (uint32)(0); i < blockCount; i++ {
b, err := AllocateBlock(blockSize)
if err != nil {
return nil
}

pool.blocksCh <- b
}

return pool
}

// Terminate ends the block pool life
func (pool *BlockPool) Terminate() {
close(pool.blocksCh)

// Release back the memory allocated to each block
for {
b := <-pool.blocksCh
if b == nil {
break
}
_ = b.Delete()
}
}

// Usage provides % usage of this block pool
func (pool *BlockPool) Usage() uint32 {
return ((pool.maxBlocks - (uint32)(len(pool.blocksCh))) * 100) / pool.maxBlocks
}

// Get a Block from the pool, return back if nothing is available
func (pool *BlockPool) Get() *Block {
// getting a block from pool will be a blocking operation if the pool is empty
b := <-pool.blocksCh

// Mark the buffer ready for reuse now
if b != nil {
b.ReUse()
}
return b
}

// Release back the Block to the pool
func (pool *BlockPool) Release(b *Block) {
select {
case pool.blocksCh <- b:
break
default:
_ = b.Delete()
}
}
Loading
Loading