Skip to content

Commit

Permalink
Merge pull request #90 from JuliaData/jps/migration-helper
Browse files Browse the repository at this point in the history
Add migration helper
  • Loading branch information
jpsamaroo authored Nov 15, 2024
2 parents 9fb85ee + 1af6696 commit a7b54b8
Showing 1 changed file with 33 additions and 8 deletions.
41 changes: 33 additions & 8 deletions src/datastore.jl
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,9 @@ end

function poolget(ref::DRef)
DEBUG_REFCOUNTING[] && _enqueue_work(Core.print, "?? (", ref.owner, ", ", ref.id, ") at ", myid(), "\n")
return access_ref(identity, ref)
end
function access_ref(f, ref::DRef, args...; local_only::Bool=false)
original_ref = ref

# Check global redirect cache
Expand All @@ -529,11 +532,11 @@ function poolget(ref::DRef)
# Fetch the value (or a RedirectTo) from the owner
@label fetch
value = if ref.owner == myid()
_getlocal(ref.id, false)
_getlocal(f, ref.id, false, args...; local_only, from=myid())
else
forwardkeyerror() do
remotecall_fetch(ref.owner, ref) do ref
MMWrap(_getlocal(ref.id, true))
remotecall_fetch(ref.owner, ref, args) do ref, args
MMWrap(_getlocal(f, ref.id, true, args...; local_only, from=myid()))
end
end
end |> unwrap_payload
Expand All @@ -551,13 +554,17 @@ function poolget(ref::DRef)
return something(value)
end

function _getlocal(id, remote)
function _getlocal(f, id, remote, args...; local_only::Bool, from::Int)
state = with_lock(()->datastore[id], datastore_lock)
lock_read(state.lock) do
if state.redirect !== nothing
return RedirectTo(state.redirect)
end
return Some{Any}(read_from_device(state, id, true))
if local_only && from != myid()
throw(ConcurrencyViolationError("Attempted to access a DRef from a worker that does not own it"))
end
value = read_from_device(state, id, true)
return Some{Any}(f(value, args...))
end
end

Expand Down Expand Up @@ -598,10 +605,10 @@ Migrate the data referenced by `ref` to another worker `to`, returning the new
any accesses via `poolget` will seamlessly redirect to the new `DRef`, but the
data is no longer stored on the same worker as `ref`.
"""
function migrate!(ref::DRef, to::Integer)
function migrate!(ref::DRef, to::Integer; pre_migration=nothing, dest_post_migration=nothing, post_migration=nothing)
@assert ref.owner != to "Cannot migrate a DRef within the same worker"
if ref.owner != myid()
return remotecall_fetch(migrate!, ref.owner, ref, to)
return remotecall_fetch(migrate!, ref.owner, ref, to; pre_migration, dest_post_migration, post_migration)
end
state = with_lock(()->datastore[ref.id], datastore_lock)

Expand All @@ -612,14 +619,32 @@ function migrate!(ref::DRef, to::Integer)
# Read the current value of the ref
data = read_from_device(state, ref, true)

# Prepare data for migration
pre_migration_data = if pre_migration !== nothing
pre_migration(data)
else
nothing
end

# Create new ref to redirect to
new_ref = remotecall_fetch(poolset, to, data)
new_ref = remotecall_fetch(to, data) do data
new_ref = poolset(data)
if dest_post_migration !== nothing
access_ref(dest_post_migration, new_ref, pre_migration_data; local_only=true)
end
return new_ref
end

# Set the redirect to our new ref
state.redirect = new_ref

# Delete the local data
delete_from_device!(state, ref)

# Clean up old data if requested
if post_migration !== nothing
post_migration(data)
end
end

return new_ref
Expand Down

2 comments on commit a7b54b8

@jpsamaroo
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator register()

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error while trying to register: Version 0.4.9 already exists

Please sign in to comment.