Skip to content

Commit

Permalink
feat: integrate anatoliy o11y prototype
Browse files Browse the repository at this point in the history
State machines now emit logs when moving states and when creating
associations with other state machines. The logs can then be used to
plot a diagram of the sequence of events.
  • Loading branch information
letFunny authored and just-now committed Jul 8, 2024
1 parent 921c368 commit 2f5ecff
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 31 deletions.
1 change: 1 addition & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ raft_core_fuzzy_test_LDFLAGS = -no-install
raft_core_fuzzy_test_LDADD = libtest.la libraft.la

raft_uv_unit_test_SOURCES = \
src/tracing.c \
src/raft/err.c \
src/raft/heap.c \
src/raft/syscall.c \
Expand Down
27 changes: 24 additions & 3 deletions src/lib/sm.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#include "sm.h"
#include <stdatomic.h>
#include <stddef.h> /* NULL */
#include <stdio.h> /* fprintf */
#include <string.h>
#include <unistd.h>
#include "../tracing.h"
#include "../utils.h"

Expand All @@ -16,18 +19,37 @@ int sm_state(const struct sm *m)
return m->state;
}

static inline void sm_obs(const struct sm *m)
{
tracef("%s pid: %d sm_id: %lu %s |\n",
m->name, m->pid, m->id, m->conf[sm_state(m)].name);
}

void sm_relate(const struct sm *from, const struct sm *to)
{
tracef("%s-to-%s opid: %d dpid: %d id: %lu id: %lu |\n",
from->name, to->name, from->pid, to->pid, from->id, to->id);
}

void sm_init(struct sm *m,
bool (*invariant)(const struct sm *, int),
bool (*is_locked)(const struct sm *),
const struct sm_conf *conf,
const char *name,
int state)
{
static atomic_uint_least64_t id = 0;

PRE(conf[state].flags & SM_INITIAL);

m->conf = conf;
m->state = state;
m->invariant = invariant;
m->is_locked = is_locked;
m->id = ++id;
m->pid = getpid();
snprintf(m->name, SM_MAX_NAME_LENGTH, "%s", name);
sm_obs(m);

POST(m->invariant != NULL && m->invariant(m, SM_PREV_NONE));
}
Expand All @@ -42,11 +64,10 @@ void sm_move(struct sm *m, int next_state)
{
int prev = sm_state(m);

tracef("SM_MOVE %s => %s", m->conf[prev].name, m->conf[next_state].name);

PRE(sm_is_locked(m));
PRE(m->conf[sm_state(m)].allowed & BITS(next_state));
m->state = next_state;
sm_obs(m);
POST(m->invariant != NULL && m->invariant(m, prev));
}

Expand All @@ -65,7 +86,7 @@ void sm_fail(struct sm *m, int fail_state, int rc)

static __attribute__((noinline)) bool check_failed(const char *f, int n, const char *s)
{
fprintf(stderr, "%s:%d check failed: %s\n", f, n, s);
tracef("%s:%d check failed: %s\n", f, n, s);
return false;
}

Expand Down
9 changes: 9 additions & 0 deletions src/lib/sm.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@

#include <stdbool.h>
#include <stdint.h>
#include <unistd.h>

#define BITS(state) (1ULL << (state))

#define CHECK(cond) sm_check((cond), __FILE__, __LINE__, #cond)

#define SM_MAX_NAME_LENGTH 50

enum {
SM_PREV_NONE = -1,
/* sizeof(sm_conf::allowed * 8) */
Expand All @@ -29,6 +32,9 @@ struct sm
{
int rc;
int state;
char name[SM_MAX_NAME_LENGTH];
uint64_t id;
pid_t pid;
bool (*is_locked)(const struct sm *);
bool (*invariant)(const struct sm *, int);
const struct sm_conf *conf;
Expand All @@ -39,11 +45,14 @@ void sm_init(struct sm *m,
/* optional, set NULL if not used */
bool (*is_locked)(const struct sm *),
const struct sm_conf *conf,
const char *name,
int state);
void sm_fini(struct sm *m);
void sm_move(struct sm *m, int next_state);
void sm_fail(struct sm *m, int fail_state, int rc);
int sm_state(const struct sm *m);
bool sm_check(bool b, const char *f, int n, const char *s);
/* Relates one state machine to another for observability. */
void sm_relate(const struct sm *from, const struct sm *to);

#endif /* __LIB_SM__ */
2 changes: 1 addition & 1 deletion src/lib/threadpool.c
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ static void planner(void *arg)
queue *u = &pi->unordered;
queue *q;

sm_init(planner_sm, planner_invariant, NULL, planner_states,
sm_init(planner_sm, planner_invariant, NULL, planner_states, "ps",
PS_NOTHING);
uv_sem_post(sem);
uv_mutex_lock(mutex);
Expand Down
22 changes: 8 additions & 14 deletions src/raft/recv_install_snapshot.c
Original file line number Diff line number Diff line change
Expand Up @@ -415,9 +415,7 @@ static void follower_work_done(struct work *w)

static void to_init(struct timeout *to)
{
// static const char *to_sm_name = "to";
// to->sm = (struct sm){ .name = to_sm_name };
sm_init(&to->sm, to_sm_invariant, NULL, to_sm_conf, TO_INIT);
sm_init(&to->sm, to_sm_invariant, NULL, to_sm_conf, "to", TO_INIT);
}

static void leader_to_cb(struct timeout *t, int rc)
Expand All @@ -435,7 +433,7 @@ static void to_start(struct timeout *to, unsigned delay, to_cb_op to_cb)

to_init(to);
leader->ops->to_start(&leader->timeout, 10000, to_cb);
// sm_to_sm_obs(&leader->sm, &to->sm);
sm_relate(&leader->sm, &to->sm);
sm_move(&to->sm, TO_STARTED);
}

Expand Down Expand Up @@ -499,16 +497,12 @@ static bool is_a_duplicate(const void *state,

static void work_init(struct work *w)
{
// static const char *work_sm_name = "work";
// w->sm = (struct sm){ .name = work_sm_name };
sm_init(&w->sm, work_sm_invariant, NULL, work_sm_conf, WORK_INIT);
sm_init(&w->sm, work_sm_invariant, NULL, work_sm_conf, "work", WORK_INIT);
}

static void rpc_init(struct rpc *rpc)
{
// static const char *rpc_sm_name = "rpc";
// rpc->sm = (struct sm){ .name = rpc_sm_name };
sm_init(&rpc->sm, rpc_sm_invariant, NULL, rpc_sm_conf, RPC_INIT);
sm_init(&rpc->sm, rpc_sm_invariant, NULL, rpc_sm_conf, "rpc", RPC_INIT);
}

static void rpc_fini(struct rpc *rpc)
Expand All @@ -521,7 +515,7 @@ static void work_fill_leader(struct leader *leader)
{
leader->work_cb = leader->ops->ht_create;
work_init(&leader->work);
// sm_to_sm_obs(&leader->sm, &leader->work.sm);
sm_relate(&leader->sm, &leader->work.sm);
}

static void work_fill_follower(struct follower *follower)
Expand All @@ -541,19 +535,19 @@ static void work_fill_follower(struct follower *follower)
break;
}
work_init(&follower->work);
// sm_to_sm_obs(&follower->sm, &follower->work.sm);
sm_relate(&follower->sm, &follower->work.sm);
}

static void rpc_fill_leader(struct leader *leader)
{
rpc_init(&leader->rpc);
// sm_to_sm_obs(&leader->sm, &leader->rpc.sm);
sm_relate(&leader->sm, &leader->rpc.sm);
}

static void rpc_fill_follower(struct follower *follower)
{
rpc_init(&follower->rpc);
// sm_to_sm_obs(&follower->sm, &follower->rpc.sm);
sm_relate(&follower->sm, &follower->rpc.sm);
}

static int rpc_send(struct rpc *rpc, sender_send_op op, to_cb_op to_cb,
Expand Down
2 changes: 1 addition & 1 deletion src/vfs2.c
Original file line number Diff line number Diff line change
Expand Up @@ -1207,7 +1207,7 @@ static int open_entry(struct common *common, const char *name, struct entry *e)

*e = (struct entry){};
e->common = common;
sm_init(&e->wtx_sm, wtx_invariant, NULL, wtx_states, WTX_CLOSED);
sm_init(&e->wtx_sm, wtx_invariant, NULL, wtx_states, "wtx", WTX_CLOSED);

e->refcount_main_db = 1;

Expand Down
20 changes: 13 additions & 7 deletions test/raft/lib/runner.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@
#ifndef TEST_RUNNER_H_
#define TEST_RUNNER_H_

#include <signal.h>

#include "munit.h"

#include "../../../src/tracing.h"

/* Top-level suites array declaration.
*
* These top-level suites hold all module-level child suites and must be defined
Expand All @@ -18,14 +22,16 @@ extern int _main_suites_n;

/* Define the top-level suites array and the main() function of the test. */
#define RUNNER(NAME) \
MunitSuite _main_suites[SUITE__CAP]; \
int _main_suites_n = 0; \
MunitSuite _main_suites[SUITE__CAP]; \
int _main_suites_n = 0; \
\
int main(int argc, char *argv[MUNIT_ARRAY_PARAM(argc)]) \
{ \
MunitSuite suite = {(char *)"", NULL, _main_suites, 1, 0}; \
return munit_suite_main(&suite, (void *)NAME, argc, argv); \
}
int main(int argc, char *argv[MUNIT_ARRAY_PARAM(argc)]) \
{ \
signal(SIGPIPE, SIG_IGN); \
dqliteTracingMaybeEnable(true); \
MunitSuite suite = {(char *)"", NULL, _main_suites, 1, 0}; \
return munit_suite_main(&suite, (void *)NAME, argc, argv); \
}

/* Declare and register a new test suite #S belonging to the file's test module.
*
Expand Down
6 changes: 2 additions & 4 deletions test/raft/unit/test_snapshot.c
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,10 @@ TEST(snapshot, follower, set_up, tear_down, 0, NULL) {

struct follower follower = {
.ops = &ops,
// .sm = { .name = "leader" },
};

sm_init(&follower.sm, follower_sm_invariant,
NULL, follower_sm_conf, FS_NORMAL);
NULL, follower_sm_conf, "follower", FS_NORMAL);

PRE(sm_state(&follower.sm) == FS_NORMAL);
ut_follower_message_received(&follower, ut_install_snapshot());
Expand Down Expand Up @@ -237,15 +236,14 @@ TEST(snapshot, leader, set_up, tear_down, 0, NULL) {

struct leader leader = {
.ops = &ops,
// .sm = { .name = "leader" },

.sigs_more = false,
.pages_more = false,
.sigs_calculated = false,
};

sm_init(&leader.sm, leader_sm_invariant,
NULL, leader_sm_conf, LS_F_ONLINE);
NULL, leader_sm_conf, "leader", LS_F_ONLINE);

PRE(sm_state(&leader.sm) == LS_F_ONLINE);
ut_leader_message_received(&leader, append_entries());
Expand Down
2 changes: 1 addition & 1 deletion test/unit/test_sm.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ TEST_CASE(sm, simple, NULL)
struct op_states_sm sm = {};
struct sm *m = &sm.sm;

sm_init(&sm.sm, sm_invariant, NULL, op_states, S_ONLINE);
sm_init(&sm.sm, sm_invariant, NULL, op_states, "test", S_ONLINE);

sm.sm_trigger = BITS(T_CHECKED);
sm_move(m, S_ONLINE);
Expand Down

0 comments on commit 2f5ecff

Please sign in to comment.