Skip to content

Commit

Permalink
Merge pull request #91 from JamesWrigley/distributednext
Browse files Browse the repository at this point in the history
Add support for using MemPool with DistributedNext.jl
  • Loading branch information
jpsamaroo authored Dec 5, 2024
2 parents 88246ea + 554aa18 commit 96a7069
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 7 deletions.
1 change: 0 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ jobs:
strategy:
matrix:
julia-version:
- '1.8'
- '1.9'
- '1.10'
- '1.11'
Expand Down
7 changes: 6 additions & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ version = "0.4.10"
ConcurrentCollections = "5060bff5-0b44-40c5-b522-fcd3ca5cecdd"
DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
DistributedNext = "fab6aee4-877b-4bac-a744-3eca44acbb6f"
Mmap = "a63ad114-7e13-5084-954f-fe012c677804"
Preferences = "21216c6a-2e73-6563-6e65-726566657250"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
ScopedValues = "7e506255-f358-4e82-b7e4-beb19740aa63"
Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
Expand All @@ -18,12 +20,15 @@ UnsafeAtomics = "013be700-e6cd-48c3-b4a1-df204f14c38f"
[compat]
ConcurrentCollections = "0.1"
DataStructures = "0.18"
DistributedNext = "1"
Preferences = "1"
ScopedValues = "1"
UnsafeAtomics = "0.2"
julia = "1.8"
julia = "1.9"

[extras]
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
Preferences = "21216c6a-2e73-6563-6e65-726566657250"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
Sockets = "6462fe0b-24de-5631-8697-dd941f90decc"
Expand Down
15 changes: 15 additions & 0 deletions src/MemPool.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
module MemPool

import Preferences: @load_preference, @set_preferences!
using Serialization, Sockets, Random
import Serialization: serialize, deserialize
export DRef, FileRef, poolset, poolget, mmwrite, mmread, cleanup
Expand Down Expand Up @@ -52,6 +53,20 @@ unwrap_payload(f::FileRef) = unwrap_payload(open(deserialize, f.file, "r+"))

approx_size(f::FileRef) = f.size

# Preferences settings

"""
set_distributed_package!(value[="Distributed|DistributedNext"])
Set a [preference](https://github.com/JuliaPackaging/Preferences.jl) for using
either the Distributed.jl stdlib or DistributedNext.jl. You will need to restart
Julia after setting a new preference.
"""
function set_distributed_package!(value)
@set_preferences!("distributed-package" => value)
@info "MemPool.jl preference has been set, restart your Julia session for this change to take effect!"
end

include("io.jl")
include("lock.jl")
include("read_write_lock.jl")
Expand Down
16 changes: 11 additions & 5 deletions src/datastore.jl
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
using Distributed
if @load_preference("distributed-package") == "DistributedNext"
using DistributedNext
import DistributedNext: ClusterSerializer, worker_id_from_socket
else
using Distributed
import Distributed: ClusterSerializer, worker_id_from_socket
end

mutable struct DRef
owner::Int
Expand Down Expand Up @@ -26,13 +32,13 @@ function Serialization.serialize(io::AbstractSerializer, d::DRef)

_pooltransfer_send(io, d)
end
function _pooltransfer_send(io::Distributed.ClusterSerializer, d::DRef)
pid = Distributed.worker_id_from_socket(io.io)
function _pooltransfer_send(io::ClusterSerializer, d::DRef)
pid = worker_id_from_socket(io.io)
if pid != -1
pooltransfer_send_local(d, pid)
return
end
pid = Distributed.worker_id_from_socket(io)
pid = worker_id_from_socket(io)
if pid != -1
pooltransfer_send_local(d, pid)
return
Expand Down Expand Up @@ -60,7 +66,7 @@ function Serialization.deserialize(io::AbstractSerializer, dt::Type{DRef})
_pooltransfer_recv(io, d)
return d
end
function _pooltransfer_recv(io::Distributed.ClusterSerializer, d)
function _pooltransfer_recv(io::ClusterSerializer, d)
# Add a new reference manually, and unref on finalization
DEBUG_REFCOUNTING[] && _enqueue_work(Core.print, "<- (", d.owner, ", ", d.id, ") at ", myid(), "\n")
poolref(d, true)
Expand Down
11 changes: 11 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,17 @@ end
MemPool.retain_on_device!(sdevice, x1, false)
end

@testset "Preferences" begin
cmd = `$(Base.julia_cmd()) --startup-file=no --project -E 'using MemPool; parentmodule(MemPool.addprocs)'`

cd(dirname(Base.active_project())) do
@test readchomp(cmd) == "Distributed"

MemPool.set_distributed_package!("DistributedNext")
@test readchomp(cmd) == "DistributedNext"
end
end

#= TODO
Allocate, write non-CPU A, write non-CPU B, handle for A was explicitly deleted
Allocate, chain write and reads such that write starts before, and finishes after, read, ensure ordering is correct
Expand Down

0 comments on commit 96a7069

Please sign in to comment.