From c3db30375eccc99dc7124ce14094b90051e6084c Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Thu, 2 May 2024 12:47:36 -0700 Subject: [PATCH 1/2] migration: Add access_ref helper --- src/datastore.jl | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/src/datastore.jl b/src/datastore.jl index 5d4e9fa..0b436ed 100644 --- a/src/datastore.jl +++ b/src/datastore.jl @@ -519,6 +519,9 @@ end function poolget(ref::DRef) DEBUG_REFCOUNTING[] && _enqueue_work(Core.print, "?? (", ref.owner, ", ", ref.id, ") at ", myid(), "\n") + return access_ref(identity, ref) +end +function access_ref(f, ref::DRef, args...; local_only::Bool=false) original_ref = ref # Check global redirect cache @@ -529,11 +532,11 @@ function poolget(ref::DRef) # Fetch the value (or a RedirectTo) from the owner @label fetch value = if ref.owner == myid() - _getlocal(ref.id, false) + _getlocal(f, ref.id, false, args...; local_only, from=myid()) else forwardkeyerror() do - remotecall_fetch(ref.owner, ref) do ref - MMWrap(_getlocal(ref.id, true)) + remotecall_fetch(ref.owner, ref, args) do ref, args + MMWrap(_getlocal(f, ref.id, true, args...; local_only, from=myid())) end end end |> unwrap_payload @@ -551,13 +554,17 @@ function poolget(ref::DRef) return something(value) end -function _getlocal(id, remote) +function _getlocal(f, id, remote, args...; local_only::Bool, from::Int) state = with_lock(()->datastore[id], datastore_lock) lock_read(state.lock) do if state.redirect !== nothing return RedirectTo(state.redirect) end - return Some{Any}(read_from_device(state, id, true)) + if local_only && from != myid() + throw(ConcurrencyViolationError("Attempted to access a DRef from a worker that does not own it")) + end + value = read_from_device(state, id, true) + return Some{Any}(f(value, args...)) end end @@ -598,10 +605,10 @@ Migrate the data referenced by `ref` to another worker `to`, returning the new any accesses via `poolget` will seamlessly redirect to the new `DRef`, but the data is no longer stored on the same worker as `ref`. """ -function migrate!(ref::DRef, to::Integer) +function migrate!(ref::DRef, to::Integer; pre_migration=nothing, post_migration=nothing) @assert ref.owner != to "Cannot migrate a DRef within the same worker" if ref.owner != myid() - return remotecall_fetch(migrate!, ref.owner, ref, to) + return remotecall_fetch(migrate!, ref.owner, ref, to; pre_migration, post_migration) end state = with_lock(()->datastore[ref.id], datastore_lock) @@ -612,6 +619,11 @@ function migrate!(ref::DRef, to::Integer) # Read the current value of the ref data = read_from_device(state, ref, true) + # Prepare data for migration + if pre_migration !== nothing + pre_migration(data) + end + # Create new ref to redirect to new_ref = remotecall_fetch(poolset, to, data) @@ -620,6 +632,11 @@ function migrate!(ref::DRef, to::Integer) # Delete the local data delete_from_device!(state, ref) + + # Clean up old data if requested + if post_migration !== nothing + post_migration(data) + end end return new_ref From 1af66961b0616231b22968eecd24ae52ea9aa743 Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Mon, 5 Aug 2024 13:41:12 -0500 Subject: [PATCH 2/2] migration: Add dest_post_migration callback --- src/datastore.jl | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/datastore.jl b/src/datastore.jl index 0b436ed..3e3881b 100644 --- a/src/datastore.jl +++ b/src/datastore.jl @@ -605,10 +605,10 @@ Migrate the data referenced by `ref` to another worker `to`, returning the new any accesses via `poolget` will seamlessly redirect to the new `DRef`, but the data is no longer stored on the same worker as `ref`. """ -function migrate!(ref::DRef, to::Integer; pre_migration=nothing, post_migration=nothing) +function migrate!(ref::DRef, to::Integer; pre_migration=nothing, dest_post_migration=nothing, post_migration=nothing) @assert ref.owner != to "Cannot migrate a DRef within the same worker" if ref.owner != myid() - return remotecall_fetch(migrate!, ref.owner, ref, to; pre_migration, post_migration) + return remotecall_fetch(migrate!, ref.owner, ref, to; pre_migration, dest_post_migration, post_migration) end state = with_lock(()->datastore[ref.id], datastore_lock) @@ -620,12 +620,20 @@ function migrate!(ref::DRef, to::Integer; pre_migration=nothing, post_migration= data = read_from_device(state, ref, true) # Prepare data for migration - if pre_migration !== nothing + pre_migration_data = if pre_migration !== nothing pre_migration(data) + else + nothing end # Create new ref to redirect to - new_ref = remotecall_fetch(poolset, to, data) + new_ref = remotecall_fetch(to, data) do data + new_ref = poolset(data) + if dest_post_migration !== nothing + access_ref(dest_post_migration, new_ref, pre_migration_data; local_only=true) + end + return new_ref + end # Set the redirect to our new ref state.redirect = new_ref