From 7d4e8cb051fa2bbc462159a792045591aff9391e Mon Sep 17 00:00:00 2001 From: Matthieu Dorier Date: Sat, 2 Dec 2023 15:59:34 +0000 Subject: [PATCH] re-enabled migration --- include/yokan/backend.hpp | 2 +- include/yokan/common.h | 5 +- include/yokan/server.h | 16 ++ spack.yaml | 9 +- src/CMakeLists.txt | 1 + src/backends/tkrzw.cpp | 1 + src/common/checks.h | 1 + src/common/types.h | 5 + src/server/get_remi_provider_id.cpp | 46 ++++++ src/server/migration.hpp | 24 ++- src/server/provider.cpp | 205 +++++++++++++++--------- src/server/provider.hpp | 4 + tests/test-migration.cpp | 238 ++++++++++++++++++++++++++++ 13 files changed, 471 insertions(+), 86 deletions(-) create mode 100644 src/server/get_remi_provider_id.cpp create mode 100644 tests/test-migration.cpp diff --git a/include/yokan/backend.hpp b/include/yokan/backend.hpp index 94788f6..4810302 100644 --- a/include/yokan/backend.hpp +++ b/include/yokan/backend.hpp @@ -777,7 +777,7 @@ template class __YOKANBackendRegistration { template using void_t = void; template using recover_t = - decltype(U::recover("", "", "", {}, nullptr)); + decltype(U::recover("", "", {}, nullptr)); template> struct has_recover : std::false_type {}; diff --git a/include/yokan/common.h b/include/yokan/common.h index 59ff744..f5529e0 100644 --- a/include/yokan/common.h +++ b/include/yokan/common.h @@ -22,11 +22,10 @@ extern "C" { X(YOKAN_ERR_ALLOCATION, "Allocation error") \ X(YOKAN_ERR_INVALID_MID, "Invalid margo instance") \ X(YOKAN_ERR_INVALID_ARGS, "Invalid argument") \ - X(YOKAN_ERR_INVALID_PROVIDER, "Invalid provider id") \ - X(YOKAN_ERR_INVALID_DATABASE, "Invalid database id") \ + X(YOKAN_ERR_INVALID_PROVIDER, "Invalid provider") \ + X(YOKAN_ERR_INVALID_DATABASE, "Invalid database") \ X(YOKAN_ERR_INVALID_BACKEND, "Invalid backend type") \ X(YOKAN_ERR_INVALID_CONFIG, "Invalid configuration") \ - X(YOKAN_ERR_INVALID_TOKEN, "Invalid token") \ X(YOKAN_ERR_INVALID_ID, "Invalid document id") \ X(YOKAN_ERR_INVALID_FILTER, "Invalid filter") \ X(YOKAN_ERR_FROM_MERCURY, "Mercury error") \ diff --git a/include/yokan/server.h b/include/yokan/server.h index e17ce1b..3127898 100644 --- a/include/yokan/server.h +++ b/include/yokan/server.h @@ -64,6 +64,22 @@ yk_return_t yk_provider_register( yk_return_t yk_provider_destroy( yk_provider_t provider); +/** + * @brief Migrates the database from the given provider to + * the target provider. + * + * @param provider + * @param dest_addr + * @param dest_provider_id + * @param options + * + * @return YOKAN_SUCCESS or error code defined in common.h + */ +yk_return_t yk_provider_migrate_database( + yk_provider_t provider, + const char* dest_addr, + uint16_t dest_provider_id, + const struct yk_migration_options* options); /** * @brief Returns the internal configuration of the YOKAN * provider. The returned string must be free-ed by the caller. diff --git a/spack.yaml b/spack.yaml index 0cb3be5..3a5a32b 100644 --- a/spack.yaml +++ b/spack.yaml @@ -19,7 +19,7 @@ spack: - mochi-bedrock ^mpich - ycsb-cpp-interface ^ycsb@master - rocksdb+rtti - - mochi-remi@0.3.3:+bedrock + - mochi-remi concretizer: unify: true reuse: true @@ -27,3 +27,10 @@ spack: prefix_inspections: lib: [LD_LIBRARY_PATH] lib64: [LD_LIBRARY_PATH] + packages: + mochi-margo: + require: "@0.15.0:" + mochi-thallium: + require: "@0.12.0:" + mochi-remi: + require: "@0.4.0:" diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index dfe4154..179c352 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -24,6 +24,7 @@ set (server-src-files server/doc_length.cpp server/doc_list.cpp server/doc_iter.cpp + server/get_remi_provider_id.cpp server/util/filters.cpp buffer/default_bulk_cache.cpp buffer/lru_bulk_cache.cpp diff --git a/src/backends/tkrzw.cpp b/src/backends/tkrzw.cpp index 4210387..d6df1d3 100644 --- a/src/backends/tkrzw.cpp +++ b/src/backends/tkrzw.cpp @@ -380,6 +380,7 @@ class TkrzwDatabase : public DocumentStoreMixin { } virtual void destroy() override { + if(!m_db) return; auto path = m_config["path"].get(); auto type = m_config["type"].get(); m_db->Close(); diff --git a/src/common/checks.h b/src/common/checks.h index 513ddae..c044762 100644 --- a/src/common/checks.h +++ b/src/common/checks.h @@ -6,6 +6,7 @@ #ifndef __YOKAN_CHECKS_H #define __YOKAN_CHECKS_H +#include "yokan/common.h" #include "logging.h" #define CHECK_HRET(__hret__, __fun__) \ diff --git a/src/common/types.h b/src/common/types.h index 883f099..784b0b8 100644 --- a/src/common/types.h +++ b/src/common/types.h @@ -503,6 +503,11 @@ MERCURY_GEN_PROC(doc_iter_direct_back_in_t, MERCURY_GEN_PROC(doc_iter_direct_back_out_t, ((int32_t)(ret))) +/* get_remi_provider_id */ +MERCURY_GEN_PROC(get_remi_provider_id_out_t, + ((int32_t)(ret))\ + ((uint16_t)(provider_id))) + /* Extra hand-coded serialization functions */ static inline hg_return_t hg_proc_yk_id_t( diff --git a/src/server/get_remi_provider_id.cpp b/src/server/get_remi_provider_id.cpp new file mode 100644 index 0000000..2ffe398 --- /dev/null +++ b/src/server/get_remi_provider_id.cpp @@ -0,0 +1,46 @@ +/* + * (C) 2021 The University of Chicago + * + * See COPYRIGHT in top-level directory. + */ +#include "yokan/server.h" +#include "provider.hpp" +#include "../common/types.h" +#include "../common/defer.hpp" +#include "../common/logging.h" +#include "../common/checks.h" +#include +#ifdef YOKAN_HAS_REMI +#include +#endif + +void yk_get_remi_provider_id_ult(hg_handle_t h) +{ + get_remi_provider_id_out_t out = {YOKAN_SUCCESS, 0}; + + DEFER(margo_destroy(h)); + DEFER(margo_respond(h, &out)); + + margo_instance_id mid = margo_hg_handle_get_instance(h); + CHECK_MID(mid, margo_hg_handle_get_instance); + + const struct hg_info* info = margo_get_info(h); + yk_provider_t provider = (yk_provider_t)margo_registered_data(mid, info->id); + CHECK_PROVIDER(provider); + +#ifdef YOKAN_HAS_REMI + if(provider->remi.provider) { + int ret = remi_provider_get_provider_id( + provider->remi.provider, + &out.provider_id); + if(ret != REMI_SUCCESS) { + out.ret = YOKAN_ERR_FROM_REMI; + } + } else { + out.ret = YOKAN_ERR_OP_UNSUPPORTED; + } +#else + out.ret = YOKAN_ERR_OP_UNSUPPORTED; +#endif +} +DEFINE_MARGO_RPC_HANDLER(yk_get_remi_provider_id_ult) diff --git a/src/server/migration.hpp b/src/server/migration.hpp index 426c5ba..0b50561 100644 --- a/src/server/migration.hpp +++ b/src/server/migration.hpp @@ -6,8 +6,10 @@ #ifndef __MIGRATION_H #define __MIGRATION_H #ifdef YOKAN_HAS_REMI +#include #include #include +#include "../common/logging.h" #include "provider.hpp" #include #include @@ -21,7 +23,13 @@ static inline int32_t before_migration_cb(remi_fileset_t fileset, void* uargs) // is available and there isn't any database with the same name yet, // so we can do the migration safely. - (void)uargs; + yk_provider_t provider = (yk_provider_t)uargs; + if(provider->db) { + YOKAN_LOG_ERROR(provider->mid, + "Migration request rejected:" + " a database is already attached to this provider"); + return YOKAN_ERR_INVALID_DATABASE; + } const char* type = nullptr; const char* db_config = nullptr; @@ -60,6 +68,10 @@ static inline int32_t after_migration_cb(remi_fileset_t fileset, void* uargs) remi_fileset_get_metadata(fileset, "db_config", &db_config); remi_fileset_get_metadata(fileset, "migration_config", &migration_config); + std::cerr << "TYPE: " << type << std::endl; + std::cerr << "DB CONFIG: " << db_config << std::endl; + std::cerr << "MIG CONFIG: " << migration_config << std::endl; + auto json_db_config = json::parse(db_config); auto json_mig_config = json::parse(migration_config); @@ -86,9 +98,17 @@ static inline int32_t after_migration_cb(remi_fileset_t fileset, void* uargs) yk_database_t database; auto status = yokan::DatabaseFactory::recoverDatabase( type, db_config, migration_config, files, &database); - if(status != yokan::Status::OK) return (int32_t)status; + if(status != yokan::Status::OK) { + YOKAN_LOG_ERROR(provider->mid, + "Could not recover database: DatabaseFactory::recoverDatabase returned %d", + status); + return (int32_t)status; + } provider->db = database; + provider->config["database"] = json::object(); + provider->config["database"]["type"] = type; + provider->config["database"]["config"] = json::parse(database->config()); return 0; } diff --git a/src/server/provider.cpp b/src/server/provider.cpp index a6ea255..c41ef5d 100644 --- a/src/server/provider.cpp +++ b/src/server/provider.cpp @@ -127,14 +127,12 @@ yk_return_t yk_provider_register( "Yokan provider initialized with only a REMI provider" " will only be able to *receive* databases from other providers"); } - p->remi.client = a.remi.client; + p->remi.client = a.remi.client; p->remi.provider = a.remi.provider; if(p->remi.provider) { - char remi_class[16]; - sprintf(remi_class, "yokan/%05u", provider_id); - int remi_ret = remi_provider_register_migration_class( - p->remi.provider, remi_class, before_migration_cb, - after_migration_cb, nullptr, p); + int remi_ret = remi_provider_register_provider_migration_class( + p->remi.provider, "yokan", provider_id, before_migration_cb, + after_migration_cb, [](void*){}, p); if(remi_ret != REMI_SUCCESS) { YOKAN_LOG_ERROR(mid, "Failed to register migration class in REMI:" @@ -448,6 +446,12 @@ yk_return_t yk_provider_register( else p->doc_iter_direct_back_id = MARGO_REGISTER( mid, "yk_doc_iter_direct_back", doc_iter_direct_back_in_t, doc_iter_back_out_t, NULL); + id = MARGO_REGISTER_PROVIDER(mid, "yk_get_remi_provider_id", + void, get_remi_provider_id_out_t, + yk_get_remi_provider_id_ult, provider_id, p->pool); + margo_register_data(mid, id, (void*)p, NULL); + p->get_remi_provider_id = id; + margo_provider_push_finalize_callback(mid, p, &yk_finalize_provider, p); margo_provider_register_identity(mid, provider_id, "yokan"); @@ -462,6 +466,11 @@ static void yk_finalize_provider(void* p) { yk_provider_t provider = (yk_provider_t)p; margo_instance_id mid = provider->mid; +#ifdef YOKAN_HAS_REMI + if(provider->remi.provider) + remi_provider_deregister_provider_migration_class( + provider->remi.provider, "yokan", provider->provider_id); +#endif if(provider->db) { provider->db->destroy(); delete provider->db; @@ -526,125 +535,163 @@ char* yk_provider_get_config(yk_provider_t provider) return strdup(provider->config.dump().c_str()); } -#if 0 -void yk_migrate_database_ult(hg_handle_t h) -{ - hg_return_t hret; - migrate_database_in_t in; - migrate_database_out_t out; - hg_addr_t target_addr = HG_ADDR_NULL; +static inline yk_return_t get_remi_provider_id_from_remote( + yk_provider_t provider, + hg_addr_t dest_address, + uint16_t dest_provider_id, + uint16_t* remi_provider_id) { - DEFER(margo_destroy(h)); - DEFER(margo_respond(h, &out)); + hg_handle_t handle = HG_HANDLE_NULL; + get_remi_provider_id_out_t out = {0,0}; + hg_return_t hret = HG_SUCCESS; - margo_instance_id mid = margo_hg_handle_get_instance(h); - CHECK_MID(mid, margo_hg_handle_get_instance); + hret = margo_create(provider->mid, dest_address, provider->get_remi_provider_id, &handle); + if(hret != HG_SUCCESS) return YOKAN_ERR_FROM_MERCURY; + DEFER(margo_destroy(handle)); - const struct hg_info* info = margo_get_info(h); - yk_provider_t provider = (yk_provider_t)margo_registered_data(mid, info->id); - CHECK_PROVIDER(provider); + hret = margo_provider_forward(dest_provider_id, handle, NULL); + if(hret != HG_SUCCESS) return YOKAN_ERR_FROM_MERCURY; -#ifdef YOKAN_HAS_REMI - hret = margo_get_input(h, &in); - CHECK_HRET_OUT(hret, margo_get_input); - DEFER(margo_free_input(h, &in)); - - if(!check_token(provider, in.token)) { - YOKAN_LOG_ERROR(mid, "invalid token"); - out.ret = YOKAN_ERR_INVALID_TOKEN; - return; - } + hret = margo_get_output(handle, &out); + if(hret != HG_SUCCESS) return YOKAN_ERR_FROM_MERCURY; + DEFER(margo_free_output(handle, &out)); + + if(out.ret == YOKAN_SUCCESS) *remi_provider_id = out.provider_id; + return static_cast(out.ret); +} - auto database = find_database(provider, &in.origin_id); - CHECK_DATABASE(database, in.origin_id); +yk_return_t yk_provider_migrate_database( + yk_provider_t provider, + const char* dest_addr_str, + uint16_t dest_provider_id, + const struct yk_migration_options* options) { - // lookup target address - hret = margo_addr_lookup(mid, in.target_address, &target_addr); - CHECK_HRET_OUT(hret, margo_addr_lookup); - DEFER(margo_addr_free(mid, target_addr)); + if(!provider || !dest_addr_str) + return YOKAN_ERR_INVALID_ARGS; + +#ifndef YOKAN_HAS_REMI + return YOKAN_ERR_OP_UNSUPPORTED; +#else + + yk_return_t ret; + hg_return_t hret; + uint16_t remi_provider_id; + hg_addr_t dest_addr; + + // check if there is a database to migrate + auto database = provider->db; + if(!database) return YOKAN_ERR_INVALID_DATABASE; + + // lookup destination address + hret = margo_addr_lookup(provider->mid, dest_addr_str, &dest_addr); + if(hret != HG_SUCCESS) return YOKAN_ERR_FROM_MERCURY; + DEFER(margo_addr_free(provider->mid, dest_addr)); + + // get the REMI provider Id associated with the destination provider + ret = get_remi_provider_id_from_remote( + provider, dest_addr, dest_provider_id, + &remi_provider_id); + if(ret != YOKAN_SUCCESS) return ret; // create REMI provider handle remi_provider_handle_t remi_ph = NULL; int rret = remi_provider_handle_create( - provider->remi.client, target_addr, in.target_provider_id, + provider->remi.client, dest_addr, remi_provider_id, &remi_ph); - CHECK_RRET_OUT(rret, remi_provider_handle_create); + if(rret != REMI_SUCCESS) { + YOKAN_LOG_ERROR(provider->mid, "remi_provider_handle_create returned %d", rret); + return YOKAN_ERR_FROM_REMI; + } DEFER(remi_provider_handle_release(remi_ph)); // create MigrationHandle from the database std::unique_ptr mh; auto status = database->startMigration(mh); - - if(status != yokan::Status::OK) { - out.ret = static_cast(status); - return; - } + if(status != yokan::Status::OK) + return static_cast(status); // create REMI fileset remi_fileset_t fileset = REMI_FILESET_NULL; - char remi_class[16]; - sprintf(remi_class, "yokan/%05u", in.target_provider_id); - rret = remi_fileset_create(remi_class, mh->getRoot().c_str(), &fileset); - CHECK_RRET_OUT_CANCEL(rret, remi_fileset_create, mh); + rret = remi_fileset_create("yokan", mh->getRoot().c_str(), &fileset); + if(rret != REMI_SUCCESS) { + YOKAN_LOG_ERROR(provider->mid, "remi_fileset_create returned %d", rret); + return YOKAN_ERR_FROM_REMI; + } DEFER(remi_fileset_free(fileset)); + // set destination provider ID + remi_fileset_set_provider_id(fileset, dest_provider_id); + + // set transfer size + if(options) + remi_fileset_set_xfer_size(fileset, options->xfer_size); + // fill REMI fileset for(const auto& file : mh->getFiles()) { if(!file.empty() && file.back() == '/') { rret = remi_fileset_register_directory(fileset, file.c_str()); - CHECK_RRET_OUT_CANCEL(rret, remi_fileset_register_file, mh); + if(rret != REMI_SUCCESS) { + YOKAN_LOG_ERROR(provider->mid, "remi_fileset_register_directory returned %d", rret); + return YOKAN_ERR_FROM_REMI; + } } else { rret = remi_fileset_register_file(fileset, file.c_str()); - CHECK_RRET_OUT_CANCEL(rret, remi_fileset_register_file, mh); + if(rret != REMI_SUCCESS) { + YOKAN_LOG_ERROR(provider->mid, "remi_fileset_register_file returned %d", rret); + return YOKAN_ERR_FROM_REMI; + } } } // register REMI metadata - char uuid[37]; - uuid_unparse(in.origin_id.uuid, uuid); - rret = remi_fileset_register_metadata(fileset, - "uuid", uuid); - CHECK_RRET_OUT_CANCEL(rret, remi_fileset_register_metadata, mh); rret = remi_fileset_register_metadata(fileset, "db_config", database->config().c_str()); - CHECK_RRET_OUT_CANCEL(rret, remi_fileset_register_metadata, mh); + if(rret != REMI_SUCCESS) { + YOKAN_LOG_ERROR(provider->mid, "remi_fileset_register_metadata returned %d", rret); + return YOKAN_ERR_FROM_REMI; + } rret = remi_fileset_register_metadata(fileset, "type", database->type().c_str()); - CHECK_RRET_OUT_CANCEL(rret, remi_fileset_register_metadata, mh); - rret = remi_fileset_register_metadata(fileset, - "name", database->name().c_str()); - CHECK_RRET_OUT_CANCEL(rret, remi_fileset_register_metadata, mh); + if(rret != REMI_SUCCESS) { + YOKAN_LOG_ERROR(provider->mid, "remi_fileset_register_metadata returned %d", rret); + return YOKAN_ERR_FROM_REMI; + } rret = remi_fileset_register_metadata(fileset, - "migration_config", in.extra_config); - CHECK_RRET_OUT_CANCEL(rret, remi_fileset_register_metadata, mh); - - // set xfer size - if(in.xfer_size) { - rret = remi_fileset_set_xfer_size(fileset, in.xfer_size); - CHECK_RRET_OUT_CANCEL(rret, remi_fileset_set_xfer_size, mh); + "migration_config", options && options->extra_config ? options->extra_config : "{}"); + if(rret != REMI_SUCCESS) { + YOKAN_LOG_ERROR(provider->mid, "remi_fileset_register_metadata returned %d", rret); + return YOKAN_ERR_FROM_REMI; } // issue migration int remi_status = 0; - rret = remi_fileset_migrate(remi_ph, fileset, in.new_root, - REMI_KEEP_SOURCE, REMI_USE_MMAP, &remi_status); + auto new_root = options && options->new_root ? options->new_root : NULL; + rret = remi_fileset_migrate(remi_ph, fileset, + new_root ? new_root : mh->getRoot().c_str(), + REMI_REMOVE_SOURCE, REMI_USE_MMAP, &remi_status); if(remi_status) { - YOKAN_LOG_ERROR(mid, "REMI migration callback returned %d on target", remi_status); - YOKAN_LOG_ERROR(mid, "^ target was %s with provider id %d", in.target_address, in.target_provider_id); - out.ret = remi_status; + YOKAN_LOG_ERROR(provider->mid, "REMI migration callback returned %d on target", remi_status); + YOKAN_LOG_ERROR(provider->mid, "^ target was %s with provider id %d", dest_addr_str, dest_provider_id); mh->cancel(); - return; + return YOKAN_ERR_FROM_REMI; } - CHECK_RRET_OUT_CANCEL(rret, remi_fileset_migrate, mh); - out.target_id = in.origin_id; - out.ret = YOKAN_SUCCESS; -#else - out.ret = YOKAN_ERR_OP_UNSUPPORTED; + if(rret != REMI_SUCCESS) { + YOKAN_LOG_ERROR(provider->mid, "remi_fileset_migrate returned %d", rret); + mh->cancel(); + return YOKAN_ERR_FROM_REMI; + } + + mh.reset(); + + // clear database locally + database->destroy(); + delete provider->db; + provider->db = nullptr; + + return YOKAN_SUCCESS; #endif } -DEFINE_MARGO_RPC_HANDLER(yk_migrate_database_ult) -#endif static inline bool open_database_from_config(yk_provider_t provider) { diff --git a/src/server/provider.hpp b/src/server/provider.hpp index c20cd7b..c44782d 100644 --- a/src/server/provider.hpp +++ b/src/server/provider.hpp @@ -6,6 +6,7 @@ #ifndef __PROVIDER_H #define __PROVIDER_H +#include "config.h" #include "yokan/server.h" #include "yokan/backend.hpp" #include "yokan/bulk-cache.h" @@ -75,6 +76,7 @@ typedef struct yk_provider { hg_id_t doc_iter_direct_id; hg_id_t doc_iter_back_id; hg_id_t doc_iter_direct_back_id; + hg_id_t get_remi_provider_id; // REMI information struct { @@ -162,4 +164,6 @@ void yk_doc_iter_ult(hg_handle_t h); DECLARE_MARGO_RPC_HANDLER(yk_doc_iter_direct_ult) void yk_doc_iter_direct_ult(hg_handle_t h); +DECLARE_MARGO_RPC_HANDLER(yk_get_remi_provider_id_ult) +void yk_get_remi_provider_id_ult(hg_handle_t h); #endif diff --git a/tests/test-migration.cpp b/tests/test-migration.cpp new file mode 100644 index 0000000..1352438 --- /dev/null +++ b/tests/test-migration.cpp @@ -0,0 +1,238 @@ +/* + * (C) 2021 The University of Chicago + * + * See COPYRIGHT in top-level directory. + */ +#include "config.h" +#ifdef YOKAN_HAS_REMI + +#include +#include +#include +#include +#include +#include +#include +#include "available-backends.h" +#include "munit/munit.h" + +struct test_context { + margo_instance_id mid; + hg_addr_t addr; + char* addr_str; + remi_client_t remi_client; + yk_client_t yokan_client; + yk_provider_t yokan_providers[2]; + const char* backend; +}; + +static void* test_context_setup(const MunitParameter params[], void* user_data) +{ + (void) params; + (void) user_data; + margo_instance_id mid; + hg_addr_t addr; + hg_return_t hret; + yk_return_t yret; + int ret; + const char* backend = munit_parameters_get(params, "backend"); + + // create margo instance + mid = margo_init("ofi+tcp", MARGO_SERVER_MODE, 0, 0); + munit_assert_not_null(mid); + + // set log level + margo_set_global_log_level(MARGO_LOG_INFO); + margo_set_log_level(mid, MARGO_LOG_INFO); + + // get address of current process + hret = margo_addr_self(mid, &addr); + munit_assert_int(hret, ==, HG_SUCCESS); + + // get address as a string + char addr_str[128]; + hg_size_t bufsize = 128; + hret = margo_addr_to_string(mid, addr_str, &bufsize, addr); + + // register remi provider + remi_provider_t remi_provider; + ret = remi_provider_register( + mid, ABT_IO_INSTANCE_NULL, + 3, ABT_POOL_NULL, &remi_provider); + munit_assert_int(ret, ==, REMI_SUCCESS); + + // create remi client + remi_client_t remi_client; + remi_client_init(mid, ABT_IO_INSTANCE_NULL, &remi_client); + + // register yk provider 1 with a database + yk_provider_t provider1; + struct yk_provider_args args = YOKAN_PROVIDER_ARGS_INIT; + args.remi.provider = REMI_PROVIDER_NULL; + args.remi.client = remi_client; + auto provider1_config = make_provider_config(backend); + yret = yk_provider_register( + mid, 1, provider1_config.c_str(), &args, + &provider1); + munit_assert_int(yret, ==, YOKAN_SUCCESS); + + // register yk provider 2 without a database + yk_provider_t provider2; + args.remi.provider = remi_provider; + args.remi.client = REMI_CLIENT_NULL; + yret = yk_provider_register( + mid, 2, "{}", &args, + &provider2); + munit_assert_int(yret, ==, YOKAN_SUCCESS); + + // create a Yokan client object + yk_client_t yokan_client; + ret = yk_client_init(mid, &yokan_client); + munit_assert_int(ret, ==, YOKAN_SUCCESS); + + // create test context + struct test_context* context = (struct test_context*)calloc(1, sizeof(*context)); + munit_assert_not_null(context); + context->mid = mid; + context->addr = addr; + context->addr_str = strdup(addr_str); + context->remi_client = remi_client; + context->yokan_client = yokan_client; + context->backend = backend; + context->yokan_providers[0] = provider1; + context->yokan_providers[1] = provider2; + + return context; +} + +static void test_context_tear_down(void* fixture) +{ + struct test_context* context = (struct test_context*)fixture; + // free address + margo_addr_free(context->mid, context->addr); + free(context->addr_str); + // free the REMI client + remi_client_finalize(context->remi_client); + // free the Yokan client + yk_client_finalize(context->yokan_client); + + // we are not checking the return value of the above function with + // munit because we need margo_finalize to be called no matter what. + margo_finalize(context->mid); +} + +static MunitResult test_migration(const MunitParameter params[], void* data) +{ + (void)params; + (void)data; + struct test_context* context = (struct test_context*)data; + yk_return_t ret; + + // get a handle to the database in provider 1 + yk_database_handle_t dbh1; + ret = yk_database_handle_create( + context->yokan_client, + context->addr, 1, true, + &dbh1); + munit_assert_int(ret, ==, YOKAN_SUCCESS); + + int stores_values = (strcmp(context->backend, "set") != 0) + && (strcmp(context->backend, "unordered_set") != 0); + stores_values = stores_values ? 1 : 0; + + // write some values to it + for(int i = 0; i < 10; i++) { + char key[16]; + char value[16]; + sprintf(key, "key%05d", i); + sprintf(value, "value%05d", i); + size_t ksize = strlen(key); + size_t vsize = strlen(value) * stores_values; + ret = yk_put(dbh1, 0, key, ksize, value, vsize); + munit_assert_int(ret, ==, YOKAN_SUCCESS); + } + + char new_root[128]; + sprintf(new_root, "/tmp/migrated-%s", context->backend); + + // migrate the database to provider 2 + yk_migration_options options; + options.new_root = new_root; + options.extra_config = "{}"; + options.xfer_size = 0; + ret = yk_provider_migrate_database( + context->yokan_providers[0], + context->addr_str, 2, &options); + if(ret == YOKAN_ERR_OP_UNSUPPORTED) { + yk_database_handle_release(dbh1); + return MUNIT_SKIP; + } + munit_assert_int(ret, ==, YOKAN_SUCCESS); + + // trying to access the database from provider 1 should get us an error + ret = yk_put(dbh1, 0, "abc", 3, "def", 3 * stores_values); + munit_assert_int(ret, ==, YOKAN_ERR_INVALID_DATABASE); + + // release handle + ret = yk_database_handle_release(dbh1); + munit_assert_int(ret, ==, YOKAN_SUCCESS); + + // create database handle this time with provider 2 + yk_database_handle_t dbh2; + yk_database_handle_create( + context->yokan_client, + context->addr, 2, true, + &dbh2); + munit_assert_int(ret, ==, YOKAN_SUCCESS); + + // check that we can read the values from the migrated database + for(int i = 0; i < 10; i++) { + char key[16]; + char value[16]; + char expected[16]; + sprintf(key, "key%05d", i); + sprintf(expected, "value%05d", i); + memset(value, 0, 16); + size_t ksize = strlen(key); + size_t vsize = 16 * stores_values; + ret = yk_get(dbh2, 0, key, ksize, value, &vsize); + munit_assert_int(ret, ==, YOKAN_SUCCESS); + if(stores_values) { + munit_assert_int(vsize, ==, strlen(expected)); + munit_assert_string_equal(value, expected); + } + } + + // release handle + ret = yk_database_handle_release(dbh2); + munit_assert_int(ret, ==, YOKAN_SUCCESS); + + return MUNIT_OK; +} + +static MunitParameterEnum test_params[] = { + { (char*)"backend", (char**)available_backends }, + { NULL, NULL } +}; + +static MunitTest test_suite_tests[] = { + { (char*) "/migration", test_migration, + test_context_setup, test_context_tear_down, MUNIT_TEST_OPTION_NONE, test_params }, + { NULL, NULL, NULL, NULL, MUNIT_TEST_OPTION_NONE, NULL } +}; + +static const MunitSuite test_suite = { + (char*) "/yk/admin", test_suite_tests, NULL, 1, MUNIT_SUITE_OPTION_NONE +}; + +int main(int argc, char* argv[MUNIT_ARRAY_PARAM(argc + 1)]) { + return munit_suite_main(&test_suite, (void*) "yk", argc, argv); +} + +#else // YOKAN_HAS_REMI + +int main(int argc, char* argv[]) { + return 0; +} + +#endif // YOKAN_HAS_REMI