Skip to content

Commit

Permalink
Merge pull request #75 from JuliaData/jps/memory-bloat
Browse files Browse the repository at this point in the history
poolset: Call GC when free mem is low
  • Loading branch information
jpsamaroo authored Jan 13, 2024
2 parents 93ae638 + 7a41006 commit bd89772
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 4 deletions.
1 change: 0 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ jobs:
strategy:
matrix:
julia-version:
- '~1.7'
- '~1.8'
- '~1.9'
- 'nightly'
Expand Down
4 changes: 3 additions & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
Mmap = "a63ad114-7e13-5084-954f-fe012c677804"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
ScopedValues = "7e506255-f358-4e82-b7e4-beb19740aa63"
Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
Sockets = "6462fe0b-24de-5631-8697-dd941f90decc"

[compat]
DataStructures = "0.18"
julia = "1.7"
ScopedValues = "1"
julia = "1.8"

[extras]
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
Expand Down
5 changes: 5 additions & 0 deletions src/MemPool.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ using Serialization, Sockets, Random
import Serialization: serialize, deserialize
export DRef, FileRef, poolset, poolget, mmwrite, mmread, cleanup
import .Threads: ReentrantLock
using ScopedValues

## Wrapping-unwrapping of payloads:

Expand Down Expand Up @@ -117,6 +118,10 @@ function __init__()
DISKCACHE_CONFIG[] = diskcache_config = DiskCacheConfig()
setup_global_device!(diskcache_config)

if haskey(ENV, "JULIA_MEMPOOL_MEMORY_RESERVED")
MEM_RESERVED[] = parse(UInt, ENV["JULIA_MEMPOOL_MEMORY_RESERVED"])
end

# Ensure we cleanup all references
atexit(exit_hook)
end
Expand Down
66 changes: 65 additions & 1 deletion src/datastore.jl
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,18 @@ end
mutable struct SendQueue
queue::Channel{Any}
@atomic task::Union{Task,Nothing}
processing::Bool
end
const SEND_QUEUE = SendQueue(Channel(typemax(Int)), nothing)
const SEND_QUEUE = SendQueue(Channel(typemax(Int)), nothing, false)
function _enqueue_work(f, args...; gc_context=false)
if SEND_QUEUE.task === nothing
task = Task() do
while true
try
work, _args = take!(SEND_QUEUE.queue)
SEND_QUEUE.processing = true
work(_args...)
SEND_QUEUE.processing = false
catch err
exit_flag[] && continue
err isa ProcessExitedException && continue # TODO: Remove proc from counters
Expand Down Expand Up @@ -348,12 +351,73 @@ isondisk(id::Int) =
isinmemory(x::DRef) = isinmemory(x.id)
isondisk(x::DRef) = isondisk(x.id)

const MEM_RESERVED = Ref{UInt}(512 * (1024^2)) # Reserve 512MB of RAM for OS
const MEM_RESERVE_LOCK = Threads.ReentrantLock()

"""
When called, ensures that at least `MEM_RESERVED[] + size` bytes are available
to the OS. If there is not enough memory available, then a variety of calls to
the GC are performed to free up memory until either the reservation limit is
satisfied, or `max_sweeps` number of cycles have elapsed.
"""
function ensure_memory_reserved(size::Integer=0; max_sweeps::Integer=5)
sat_sub(x::T, y::T) where T = x < y ? zero(T) : x-y

# Check whether the OS is running tight on memory
sweep_ctr = 0
while true
with(QUERY_MEM_OVERRIDE => true) do
Int(storage_available(CPURAMResource())) - size < MEM_RESERVED[]
end || break

# We need more memory! Let's encourage the GC to clear some memory...
sweep_start = time_ns()
mem_used = with(QUERY_MEM_OVERRIDE => true) do
storage_utilized(CPURAMResource())
end
if sweep_ctr == 0
@debug "Not enough memory to continue! Sweeping up unused memory..."
GC.gc(false)
elseif sweep_ctr == 1
GC.gc(true)
else
@everywhere GC.gc(true)
end

# Let finalizers run
yield()

# Wait for send queue to clear
while SEND_QUEUE.processing
yield()
end

with(QUERY_MEM_OVERRIDE => true) do
mem_freed = sat_sub(mem_used, storage_utilized(CPURAMResource()))
@debug "Freed $(Base.format_bytes(mem_freed)) bytes, available: $(Base.format_bytes(storage_available(CPURAMResource())))"
end

sweep_ctr += 1
if sweep_ctr == max_sweeps
@debug "Made too many sweeps, bailing out..."
break
end
end
if sweep_ctr > 0
@debug "Swept for $sweep_ctr cycles"
end
end

function poolset(@nospecialize(x), pid=myid(); size=approx_size(x),
retain=false, restore=false,
device=GLOBAL_DEVICE[], leaf_device=initial_leaf_device(device),
tag=nothing, leaf_tag=Tag(),
destructor=nothing)
if pid == myid()
if !restore
@lock MEM_RESERVE_LOCK ensure_memory_reserved(size)
end

id = atomic_add!(id_counter, 1)
sstate = if !restore
StorageState(Some{Any}(x),
Expand Down
3 changes: 2 additions & 1 deletion src/storage.jl
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ QueriedMemInfo() = QueriedMemInfo(UInt64(0), UInt64(0))
const QUERY_MEM_AVAILABLE = Ref(QueriedMemInfo())
const QUERY_MEM_CAPACITY = Ref(QueriedMemInfo())
const QUERY_MEM_PERIOD = 10 * 1000^2 # 10ms
const QUERY_MEM_OVERRIDE = ScopedValue(false)
function _query_mem_periodically(kind::Symbol)
if !(kind in (:available, :capacity))
throw(ArgumentError("Invalid memory query kind: $kind"))
Expand All @@ -197,7 +198,7 @@ function _query_mem_periodically(kind::Symbol)
end
mem_info = mem_bin[]
now_ns = time_ns()
if mem_info.last_ns < now_ns - QUERY_MEM_PERIOD
if QUERY_MEM_OVERRIDE[] || mem_info.last_ns < now_ns - QUERY_MEM_PERIOD
if kind == :available
new_mem_info = QueriedMemInfo(free_memory(), now_ns)
elseif kind == :capacity
Expand Down

0 comments on commit bd89772

Please sign in to comment.