Skip to content

Commit

Permalink
re-enabled migration
Browse files Browse the repository at this point in the history
  • Loading branch information
mdorier committed Dec 2, 2023
1 parent 2938841 commit 7d4e8cb
Show file tree
Hide file tree
Showing 13 changed files with 471 additions and 86 deletions.
2 changes: 1 addition & 1 deletion include/yokan/backend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,7 @@ template <typename T> class __YOKANBackendRegistration {
template<typename ... U> using void_t = void;

template<typename U> using recover_t =
decltype(U::recover("", "", "", {}, nullptr));
decltype(U::recover("", "", {}, nullptr));

template<typename U, typename = void_t<>>
struct has_recover : std::false_type {};
Expand Down
5 changes: 2 additions & 3 deletions include/yokan/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ extern "C" {
X(YOKAN_ERR_ALLOCATION, "Allocation error") \
X(YOKAN_ERR_INVALID_MID, "Invalid margo instance") \
X(YOKAN_ERR_INVALID_ARGS, "Invalid argument") \
X(YOKAN_ERR_INVALID_PROVIDER, "Invalid provider id") \
X(YOKAN_ERR_INVALID_DATABASE, "Invalid database id") \
X(YOKAN_ERR_INVALID_PROVIDER, "Invalid provider") \
X(YOKAN_ERR_INVALID_DATABASE, "Invalid database") \
X(YOKAN_ERR_INVALID_BACKEND, "Invalid backend type") \
X(YOKAN_ERR_INVALID_CONFIG, "Invalid configuration") \
X(YOKAN_ERR_INVALID_TOKEN, "Invalid token") \
X(YOKAN_ERR_INVALID_ID, "Invalid document id") \
X(YOKAN_ERR_INVALID_FILTER, "Invalid filter") \
X(YOKAN_ERR_FROM_MERCURY, "Mercury error") \
Expand Down
16 changes: 16 additions & 0 deletions include/yokan/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,22 @@ yk_return_t yk_provider_register(
yk_return_t yk_provider_destroy(
yk_provider_t provider);

/**
* @brief Migrates the database from the given provider to
* the target provider.
*
* @param provider
* @param dest_addr
* @param dest_provider_id
* @param options
*
* @return YOKAN_SUCCESS or error code defined in common.h
*/
yk_return_t yk_provider_migrate_database(
yk_provider_t provider,
const char* dest_addr,
uint16_t dest_provider_id,
const struct yk_migration_options* options);
/**
* @brief Returns the internal configuration of the YOKAN
* provider. The returned string must be free-ed by the caller.
Expand Down
9 changes: 8 additions & 1 deletion spack.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,18 @@ spack:
- mochi-bedrock ^mpich
- ycsb-cpp-interface ^ycsb@master
- rocksdb+rtti
- mochi-remi@0.3.3:+bedrock
- mochi-remi
concretizer:
unify: true
reuse: true
modules:
prefix_inspections:
lib: [LD_LIBRARY_PATH]
lib64: [LD_LIBRARY_PATH]
packages:
mochi-margo:
require: "@0.15.0:"
mochi-thallium:
require: "@0.12.0:"
mochi-remi:
require: "@0.4.0:"
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ set (server-src-files
server/doc_length.cpp
server/doc_list.cpp
server/doc_iter.cpp
server/get_remi_provider_id.cpp
server/util/filters.cpp
buffer/default_bulk_cache.cpp
buffer/lru_bulk_cache.cpp
Expand Down
1 change: 1 addition & 0 deletions src/backends/tkrzw.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ class TkrzwDatabase : public DocumentStoreMixin<DatabaseInterface> {
}

virtual void destroy() override {
if(!m_db) return;
auto path = m_config["path"].get<std::string>();
auto type = m_config["type"].get<std::string>();
m_db->Close();
Expand Down
1 change: 1 addition & 0 deletions src/common/checks.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#ifndef __YOKAN_CHECKS_H
#define __YOKAN_CHECKS_H

#include "yokan/common.h"
#include "logging.h"

#define CHECK_HRET(__hret__, __fun__) \
Expand Down
5 changes: 5 additions & 0 deletions src/common/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,11 @@ MERCURY_GEN_PROC(doc_iter_direct_back_in_t,
MERCURY_GEN_PROC(doc_iter_direct_back_out_t,
((int32_t)(ret)))

/* get_remi_provider_id */
MERCURY_GEN_PROC(get_remi_provider_id_out_t,
((int32_t)(ret))\
((uint16_t)(provider_id)))

/* Extra hand-coded serialization functions */

static inline hg_return_t hg_proc_yk_id_t(
Expand Down
46 changes: 46 additions & 0 deletions src/server/get_remi_provider_id.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* (C) 2021 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include "yokan/server.h"
#include "provider.hpp"
#include "../common/types.h"
#include "../common/defer.hpp"
#include "../common/logging.h"
#include "../common/checks.h"
#include <numeric>
#ifdef YOKAN_HAS_REMI
#include <remi/remi-server.h>
#endif

void yk_get_remi_provider_id_ult(hg_handle_t h)
{
get_remi_provider_id_out_t out = {YOKAN_SUCCESS, 0};

DEFER(margo_destroy(h));
DEFER(margo_respond(h, &out));

margo_instance_id mid = margo_hg_handle_get_instance(h);
CHECK_MID(mid, margo_hg_handle_get_instance);

const struct hg_info* info = margo_get_info(h);
yk_provider_t provider = (yk_provider_t)margo_registered_data(mid, info->id);
CHECK_PROVIDER(provider);

#ifdef YOKAN_HAS_REMI
if(provider->remi.provider) {
int ret = remi_provider_get_provider_id(
provider->remi.provider,
&out.provider_id);
if(ret != REMI_SUCCESS) {
out.ret = YOKAN_ERR_FROM_REMI;
}
} else {
out.ret = YOKAN_ERR_OP_UNSUPPORTED;
}
#else
out.ret = YOKAN_ERR_OP_UNSUPPORTED;
#endif
}
DEFINE_MARGO_RPC_HANDLER(yk_get_remi_provider_id_ult)
24 changes: 22 additions & 2 deletions src/server/migration.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
#ifndef __MIGRATION_H
#define __MIGRATION_H
#ifdef YOKAN_HAS_REMI
#include <iostream>
#include <nlohmann/json.hpp>
#include <yokan/common.h>
#include "../common/logging.h"
#include "provider.hpp"
#include <remi/remi-common.h>
#include <remi/remi-client.h>
Expand All @@ -21,7 +23,13 @@ static inline int32_t before_migration_cb(remi_fileset_t fileset, void* uargs)
// is available and there isn't any database with the same name yet,
// so we can do the migration safely.

(void)uargs;
yk_provider_t provider = (yk_provider_t)uargs;
if(provider->db) {
YOKAN_LOG_ERROR(provider->mid,
"Migration request rejected:"
" a database is already attached to this provider");
return YOKAN_ERR_INVALID_DATABASE;
}

const char* type = nullptr;
const char* db_config = nullptr;
Expand Down Expand Up @@ -60,6 +68,10 @@ static inline int32_t after_migration_cb(remi_fileset_t fileset, void* uargs)
remi_fileset_get_metadata(fileset, "db_config", &db_config);
remi_fileset_get_metadata(fileset, "migration_config", &migration_config);

std::cerr << "TYPE: " << type << std::endl;
std::cerr << "DB CONFIG: " << db_config << std::endl;
std::cerr << "MIG CONFIG: " << migration_config << std::endl;

auto json_db_config = json::parse(db_config);
auto json_mig_config = json::parse(migration_config);

Expand All @@ -86,9 +98,17 @@ static inline int32_t after_migration_cb(remi_fileset_t fileset, void* uargs)
yk_database_t database;
auto status = yokan::DatabaseFactory::recoverDatabase(
type, db_config, migration_config, files, &database);
if(status != yokan::Status::OK) return (int32_t)status;
if(status != yokan::Status::OK) {
YOKAN_LOG_ERROR(provider->mid,
"Could not recover database: DatabaseFactory::recoverDatabase returned %d",
status);
return (int32_t)status;
}

provider->db = database;
provider->config["database"] = json::object();
provider->config["database"]["type"] = type;
provider->config["database"]["config"] = json::parse(database->config());

return 0;
}
Expand Down
Loading

0 comments on commit 7d4e8cb

Please sign in to comment.