Skip to content

Commit

Permalink
kvs: remove calls to content.flush
Browse files Browse the repository at this point in the history
Problem: Calls to content.checkpoint-put (via kvs_checkpoint_commit())
will automatically call content.flush.  Therefore the calls to
content.flush in the kvs are duplicates and now unnecessary.

Remove calls to content.flush in the kvs module.
  • Loading branch information
chu11 committed Sep 5, 2024
1 parent 334e64d commit 8e9d319
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 84 deletions.
29 changes: 4 additions & 25 deletions src/modules/kvs/kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -1041,20 +1041,6 @@ static void kvstxn_apply (kvstxn_t *kt)
assert (wait_get_usecount (wait) > 0);
goto stall;
}
else if (ret == KVSTXN_PROCESS_SYNC_CONTENT_FLUSH) {
/* N.B. futre is managed by kvstxn, should not call
* flux_future_destroy() on it */
flux_future_t *f = kvstxn_sync_content_flush (kt);
if (!f) {
errnum = errno;
goto done;
}
if (flux_future_then (f, -1., kvstxn_apply_cb, kt) < 0) {
errnum = errno;
goto done;
}
goto stall;
}
else if (ret == KVSTXN_PROCESS_SYNC_CHECKPOINT) {
/* N.B. futre is managed by kvstxn, should not call
* flux_future_destroy() on it */
Expand Down Expand Up @@ -2914,22 +2900,15 @@ static int checkpoint_get (flux_t *h, char *buf, size_t len, int *seq)
*/
static int checkpoint_put (flux_t *h, const char *rootref, int rootseq)
{
flux_future_t *f1 = NULL;
flux_future_t *f2 = NULL;
flux_future_t *f = NULL;
int rv = -1;

/* first must ensure all content is flushed */
if (!(f1 = flux_rpc (h, "content.flush", NULL, 0, 0))
|| flux_rpc_get (f1, NULL) < 0)
goto error;

if (!(f2 = kvs_checkpoint_commit (h, NULL, rootref, rootseq, 0, 0))
|| flux_rpc_get (f2, NULL) < 0)
if (!(f = kvs_checkpoint_commit (h, NULL, rootref, rootseq, 0, 0))
|| flux_rpc_get (f, NULL) < 0)
goto error;
rv = 0;
error:
flux_future_destroy (f1);
flux_future_destroy (f2);
flux_future_destroy (f);
return rv;
}

Expand Down
51 changes: 4 additions & 47 deletions src/modules/kvs/kvstxn.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ struct kvstxn {
char newroot[BLOBREF_MAX_STRING_SIZE];
zlist_t *missing_refs_list;
zlist_t *dirty_cache_entries_list;
flux_future_t *f_sync_content_flush;
flux_future_t *f_sync_checkpoint;
bool processing; /* kvstxn is being processed */
bool merged; /* kvstxn is a merger of transactions */
Expand All @@ -77,7 +76,6 @@ struct kvstxn {
* STORE - generate dirty entries for caller to store
* GENERATE_KEYS - stall until stores complete
* - generate keys modified in txn
* SYNC_CONTENT_FLUSH - call content.flush (for FLUX_KVS_SYNC)
* SYNC_CHECKPOINT - call kvs_checkpoint_commit (for FLUX_KVS_SYNC)
* FINISHED - end state
*
Expand All @@ -87,8 +85,7 @@ struct kvstxn {
* APPLY_OPS -> STORE
* STORE -> GENERATE_KEYS
* GENERATE_KEYS -> FINISHED
* GENERATE_KEYS -> SYNC_CONTENT_FLUSH
* SYNC_CONTENT_FLUSH -> SYNC_CHECKPOINT
* GENERATE_KEYS -> SYNC_CHECKPOINT
* SYNC_CHECKPOINT -> FINISHED
*/
enum {
Expand All @@ -97,9 +94,8 @@ struct kvstxn {
KVSTXN_STATE_APPLY_OPS = 3,
KVSTXN_STATE_STORE = 4,
KVSTXN_STATE_GENERATE_KEYS = 5,
KVSTXN_STATE_SYNC_CONTENT_FLUSH = 6,
KVSTXN_STATE_SYNC_CHECKPOINT = 7,
KVSTXN_STATE_FINISHED = 8,
KVSTXN_STATE_SYNC_CHECKPOINT = 6,
KVSTXN_STATE_FINISHED = 7,
} state;
};

Expand All @@ -116,7 +112,6 @@ static void kvstxn_destroy (kvstxn_t *kt)
zlist_destroy (&kt->missing_refs_list);
if (kt->dirty_cache_entries_list)
zlist_destroy (&kt->dirty_cache_entries_list);
flux_future_destroy (kt->f_sync_content_flush);
flux_future_destroy (kt->f_sync_checkpoint);
free (kt);
}
Expand Down Expand Up @@ -1061,38 +1056,10 @@ kvstxn_process_t kvstxn_process (kvstxn_t *kt,
}

if (kt->flags & FLUX_KVS_SYNC)
kt->state = KVSTXN_STATE_SYNC_CONTENT_FLUSH;
kt->state = KVSTXN_STATE_SYNC_CHECKPOINT;
else
kt->state = KVSTXN_STATE_FINISHED;
}
else if (kt->state == KVSTXN_STATE_SYNC_CONTENT_FLUSH) {
if (!(kt->f_sync_content_flush)) {
kt->f_sync_content_flush = flux_rpc (kt->ktm->h,
"content.flush",
NULL,
0,
0);
if (!kt->f_sync_content_flush) {
kt->errnum = errno;
return KVSTXN_PROCESS_ERROR;
}
kt->blocked = 1;
return KVSTXN_PROCESS_SYNC_CONTENT_FLUSH;
}

/* user did not wait for future to complex */
if (!flux_future_is_ready (kt->f_sync_content_flush)) {
kt->blocked = 1;
return KVSTXN_PROCESS_SYNC_CONTENT_FLUSH;
}

if (flux_rpc_get (kt->f_sync_content_flush, NULL) < 0) {
kt->errnum = errno;
return KVSTXN_PROCESS_ERROR;
}

kt->state = KVSTXN_STATE_SYNC_CHECKPOINT;
}
else if (kt->state == KVSTXN_STATE_SYNC_CHECKPOINT) {

if (!(kt->f_sync_checkpoint)) {
Expand Down Expand Up @@ -1199,16 +1166,6 @@ int kvstxn_iter_dirty_cache_entries (kvstxn_t *kt,
return 0;
}

flux_future_t *kvstxn_sync_content_flush (kvstxn_t *kt)
{
if (kt->state != KVSTXN_STATE_SYNC_CONTENT_FLUSH) {
errno = EINVAL;
return NULL;
}

return kt->f_sync_content_flush;
}

flux_future_t *kvstxn_sync_checkpoint (kvstxn_t *kt)
{
if (kt->state != KVSTXN_STATE_SYNC_CHECKPOINT) {
Expand Down
11 changes: 2 additions & 9 deletions src/modules/kvs/kvstxn.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ typedef enum {
KVSTXN_PROCESS_ERROR = 1,
KVSTXN_PROCESS_LOAD_MISSING_REFS = 2,
KVSTXN_PROCESS_DIRTY_CACHE_ENTRIES = 3,
KVSTXN_PROCESS_SYNC_CONTENT_FLUSH = 4,
KVSTXN_PROCESS_SYNC_CHECKPOINT = 5,
KVSTXN_PROCESS_FINISHED = 6,
KVSTXN_PROCESS_SYNC_CHECKPOINT = 4,
KVSTXN_PROCESS_FINISHED = 5,
} kvstxn_process_t;

/* api flags, to be used with kvstxn_mgr_add_transaction()
Expand Down Expand Up @@ -95,7 +94,6 @@ json_t *kvstxn_get_keys (kvstxn_t *kt);
* KVSTXN_PROCESS_LOAD_MISSING_REFS stall & load,
* KVSTXN_PROCESS_DIRTY_CACHE_ENTRIES stall & process dirty cache
* entries,
* KVSTXN_PROCESS_SYNC_CONTENT_FLUSH stall & wait for future to fulfill
* KVSTXN_PROCESS_SYNC_CHECKPOINT stall & wait for future to fulfill
* KVSTXN_PROCESS_FINISHED all done
*
Expand All @@ -106,8 +104,6 @@ json_t *kvstxn_get_keys (kvstxn_t *kt);
* on stall & process dirty cache entries, call
* kvstxn_iter_dirty_cache_entries() to process entries.
*
* on stall & content-flush, call kvstxn_sync_content_flush() to get future.
*
* on stall & checkpoint, call kvstxn_sync_checkpoint() to get future.
*
* on completion, call kvstxn_get_newroot_ref() to get reference to
Expand Down Expand Up @@ -140,9 +136,6 @@ int kvstxn_iter_dirty_cache_entries (kvstxn_t *kt,
*/
void kvstxn_cleanup_dirty_cache_entry (kvstxn_t *kt, struct cache_entry *entry);

/* on stall, get confent.flush future to wait for fulfillment on */
flux_future_t *kvstxn_sync_content_flush (kvstxn_t *kt);

/* on stall, get checkpoint future to wait for fulfillment on */
flux_future_t *kvstxn_sync_checkpoint (kvstxn_t *kt);

Expand Down
3 changes: 0 additions & 3 deletions src/modules/kvs/test/kvstxn.c
Original file line number Diff line number Diff line change
Expand Up @@ -700,9 +700,6 @@ void kvstxn_basic_tests (void)
ok (kvstxn_iter_dirty_cache_entries (kt, cache_noop_cb, NULL) < 0,
"kvstxn_iter_dirty_cache_entries returns < 0 for call on invalid state");

ok (kvstxn_sync_content_flush (kt) == NULL,
"kvstxn_sync_content_flush returns NULL for call on invalid state");

ok (kvstxn_sync_checkpoint (kt) == NULL,
"kvstxn_sync_checkpoint returns NULL for call on invalid state");

Expand Down

0 comments on commit 8e9d319

Please sign in to comment.