diff --git a/src/modules/kvs/kvs.c b/src/modules/kvs/kvs.c index 336295ed80c9..d8f8173fcccc 100644 --- a/src/modules/kvs/kvs.c +++ b/src/modules/kvs/kvs.c @@ -1068,20 +1068,6 @@ static void kvstxn_apply (kvstxn_t *kt) assert (wait_get_usecount (wait) > 0); goto stall; } - else if (ret == KVSTXN_PROCESS_SYNC_CONTENT_FLUSH) { - /* N.B. futre is managed by kvstxn, should not call - * flux_future_destroy() on it */ - flux_future_t *f = kvstxn_sync_content_flush (kt); - if (!f) { - errnum = errno; - goto done; - } - if (flux_future_then (f, -1., kvstxn_apply_cb, kt) < 0) { - errnum = errno; - goto done; - } - goto stall; - } else if (ret == KVSTXN_PROCESS_SYNC_CHECKPOINT) { /* N.B. futre is managed by kvstxn, should not call * flux_future_destroy() on it */ @@ -3142,22 +3128,15 @@ static int checkpoint_get (flux_t *h, char *buf, size_t len, int *seq) */ static int checkpoint_put (flux_t *h, const char *rootref, int rootseq) { - flux_future_t *f1 = NULL; - flux_future_t *f2 = NULL; + flux_future_t *f = NULL; int rv = -1; - /* first must ensure all content is flushed */ - if (!(f1 = flux_rpc (h, "content.flush", NULL, 0, 0)) - || flux_rpc_get (f1, NULL) < 0) - goto error; - - if (!(f2 = kvs_checkpoint_commit (h, NULL, rootref, rootseq, 0, 0)) - || flux_rpc_get (f2, NULL) < 0) + if (!(f = kvs_checkpoint_commit (h, NULL, rootref, rootseq, 0, 0)) + || flux_rpc_get (f, NULL) < 0) goto error; rv = 0; error: - flux_future_destroy (f1); - flux_future_destroy (f2); + flux_future_destroy (f); return rv; } diff --git a/src/modules/kvs/kvstxn.c b/src/modules/kvs/kvstxn.c index 1a7e1eea99c4..83fd8bb4b90d 100644 --- a/src/modules/kvs/kvstxn.c +++ b/src/modules/kvs/kvstxn.c @@ -61,7 +61,6 @@ struct kvstxn { char newroot[BLOBREF_MAX_STRING_SIZE]; zlist_t *missing_refs_list; zlist_t *dirty_cache_entries_list; - flux_future_t *f_sync_content_flush; flux_future_t *f_sync_checkpoint; bool processing; /* kvstxn is being processed */ bool merged; /* kvstxn is a merger of transactions */ @@ -77,7 +76,6 @@ struct kvstxn { * STORE - generate dirty entries for caller to store * GENERATE_KEYS - stall until stores complete * - generate keys modified in txn - * SYNC_CONTENT_FLUSH - call content.flush (for FLUX_KVS_SYNC) * SYNC_CHECKPOINT - call kvs_checkpoint_commit (for FLUX_KVS_SYNC) * FINISHED - end state * @@ -87,8 +85,7 @@ struct kvstxn { * APPLY_OPS -> STORE * STORE -> GENERATE_KEYS * GENERATE_KEYS -> FINISHED - * GENERATE_KEYS -> SYNC_CONTENT_FLUSH - * SYNC_CONTENT_FLUSH -> SYNC_CHECKPOINT + * GENERATE_KEYS -> SYNC_CHECKPOINT * SYNC_CHECKPOINT -> FINISHED */ enum { @@ -97,9 +94,8 @@ struct kvstxn { KVSTXN_STATE_APPLY_OPS = 3, KVSTXN_STATE_STORE = 4, KVSTXN_STATE_GENERATE_KEYS = 5, - KVSTXN_STATE_SYNC_CONTENT_FLUSH = 6, - KVSTXN_STATE_SYNC_CHECKPOINT = 7, - KVSTXN_STATE_FINISHED = 8, + KVSTXN_STATE_SYNC_CHECKPOINT = 6, + KVSTXN_STATE_FINISHED = 7, } state; }; @@ -116,7 +112,6 @@ static void kvstxn_destroy (kvstxn_t *kt) zlist_destroy (&kt->missing_refs_list); if (kt->dirty_cache_entries_list) zlist_destroy (&kt->dirty_cache_entries_list); - flux_future_destroy (kt->f_sync_content_flush); flux_future_destroy (kt->f_sync_checkpoint); free (kt); } @@ -1085,38 +1080,10 @@ kvstxn_process_t kvstxn_process (kvstxn_t *kt, } if (kt->flags & FLUX_KVS_SYNC) - kt->state = KVSTXN_STATE_SYNC_CONTENT_FLUSH; + kt->state = KVSTXN_STATE_SYNC_CHECKPOINT; else kt->state = KVSTXN_STATE_FINISHED; } - else if (kt->state == KVSTXN_STATE_SYNC_CONTENT_FLUSH) { - if (!(kt->f_sync_content_flush)) { - kt->f_sync_content_flush = flux_rpc (kt->ktm->h, - "content.flush", - NULL, - 0, - 0); - if (!kt->f_sync_content_flush) { - kt->errnum = errno; - return KVSTXN_PROCESS_ERROR; - } - kt->blocked = 1; - return KVSTXN_PROCESS_SYNC_CONTENT_FLUSH; - } - - /* user did not wait for future to complex */ - if (!flux_future_is_ready (kt->f_sync_content_flush)) { - kt->blocked = 1; - return KVSTXN_PROCESS_SYNC_CONTENT_FLUSH; - } - - if (flux_rpc_get (kt->f_sync_content_flush, NULL) < 0) { - kt->errnum = errno; - return KVSTXN_PROCESS_ERROR; - } - - kt->state = KVSTXN_STATE_SYNC_CHECKPOINT; - } else if (kt->state == KVSTXN_STATE_SYNC_CHECKPOINT) { if (!(kt->f_sync_checkpoint)) { @@ -1223,16 +1190,6 @@ int kvstxn_iter_dirty_cache_entries (kvstxn_t *kt, return 0; } -flux_future_t *kvstxn_sync_content_flush (kvstxn_t *kt) -{ - if (kt->state != KVSTXN_STATE_SYNC_CONTENT_FLUSH) { - errno = EINVAL; - return NULL; - } - - return kt->f_sync_content_flush; -} - flux_future_t *kvstxn_sync_checkpoint (kvstxn_t *kt) { if (kt->state != KVSTXN_STATE_SYNC_CHECKPOINT) { diff --git a/src/modules/kvs/kvstxn.h b/src/modules/kvs/kvstxn.h index 1e715cf35b12..8de66dd172fb 100644 --- a/src/modules/kvs/kvstxn.h +++ b/src/modules/kvs/kvstxn.h @@ -22,9 +22,8 @@ typedef enum { KVSTXN_PROCESS_ERROR = 1, KVSTXN_PROCESS_LOAD_MISSING_REFS = 2, KVSTXN_PROCESS_DIRTY_CACHE_ENTRIES = 3, - KVSTXN_PROCESS_SYNC_CONTENT_FLUSH = 4, - KVSTXN_PROCESS_SYNC_CHECKPOINT = 5, - KVSTXN_PROCESS_FINISHED = 6, + KVSTXN_PROCESS_SYNC_CHECKPOINT = 4, + KVSTXN_PROCESS_FINISHED = 5, } kvstxn_process_t; /* api flags, to be used with kvstxn_mgr_add_transaction() @@ -95,7 +94,6 @@ json_t *kvstxn_get_keys (kvstxn_t *kt); * KVSTXN_PROCESS_LOAD_MISSING_REFS stall & load, * KVSTXN_PROCESS_DIRTY_CACHE_ENTRIES stall & process dirty cache * entries, - * KVSTXN_PROCESS_SYNC_CONTENT_FLUSH stall & wait for future to fulfill * KVSTXN_PROCESS_SYNC_CHECKPOINT stall & wait for future to fulfill * KVSTXN_PROCESS_FINISHED all done * @@ -106,8 +104,6 @@ json_t *kvstxn_get_keys (kvstxn_t *kt); * on stall & process dirty cache entries, call * kvstxn_iter_dirty_cache_entries() to process entries. * - * on stall & content-flush, call kvstxn_sync_content_flush() to get future. - * * on stall & checkpoint, call kvstxn_sync_checkpoint() to get future. * * on completion, call kvstxn_get_newroot_ref() to get reference to @@ -140,9 +136,6 @@ int kvstxn_iter_dirty_cache_entries (kvstxn_t *kt, */ void kvstxn_cleanup_dirty_cache_entry (kvstxn_t *kt, struct cache_entry *entry); -/* on stall, get confent.flush future to wait for fulfillment on */ -flux_future_t *kvstxn_sync_content_flush (kvstxn_t *kt); - /* on stall, get checkpoint future to wait for fulfillment on */ flux_future_t *kvstxn_sync_checkpoint (kvstxn_t *kt); diff --git a/src/modules/kvs/test/kvstxn.c b/src/modules/kvs/test/kvstxn.c index 301925aaa337..77f1cda17e93 100644 --- a/src/modules/kvs/test/kvstxn.c +++ b/src/modules/kvs/test/kvstxn.c @@ -700,9 +700,6 @@ void kvstxn_basic_tests (void) ok (kvstxn_iter_dirty_cache_entries (kt, cache_noop_cb, NULL) < 0, "kvstxn_iter_dirty_cache_entries returns < 0 for call on invalid state"); - ok (kvstxn_sync_content_flush (kt) == NULL, - "kvstxn_sync_content_flush returns NULL for call on invalid state"); - ok (kvstxn_sync_checkpoint (kt) == NULL, "kvstxn_sync_checkpoint returns NULL for call on invalid state");