diff --git a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MainDriver.java b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MainDriver.java index f45e26fe..ddca410d 100644 --- a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MainDriver.java +++ b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MainDriver.java @@ -179,18 +179,18 @@ private R doInitialRoot(Type rootType) { FormatDriver detectedDriver = detectFormat(); StateAndMetadata loadedState = detectedDriver.loadAllState(); root = loadedState.state(); - detectedDriver.onRevisionToSkip(loadedState.revision()); publishFormatDriver(detectedDriver); + detectedDriver.onRevisionToSkip(loadedState.revision()); } catch (UninitializedCollectionException e) { LOGGER.debug("Database collection is uninitialized; will initialize using downstream.initialRoot"); root = callDownstreamInitialRoot(rootType); try (var session = collection.newSession()) { FormatDriver preferredDriver = newPreferredFormatDriver(); preferredDriver.initializeCollection(new StateAndMetadata<>(root, REVISION_ZERO, bosk.diagnosticContext().getAttributes())); - preferredDriver.onRevisionToSkip(REVISION_ONE); // initialRoot handles REVISION_ONE; downstream only needs to know about changes after that session.commitTransactionIfAny(); // We can now publish the driver knowing that the transaction, if there is one, has committed publishFormatDriver(preferredDriver); + preferredDriver.onRevisionToSkip(REVISION_ONE); // initialRoot handles REVISION_ONE; downstream only needs to know about changes after that } catch (RuntimeException | IOException e2) { LOGGER.warn("Failed to initialize database; disconnecting", e2); setDisconnectedDriver(e2); @@ -363,7 +363,6 @@ public void onConnectionSucceeded() throws // before ours (below) because this code runs on the ChangeReceiver thread, which is // the only thread that submits updates downstream. - newDriver.onRevisionToSkip(loadedState.revision()); publishFormatDriver(newDriver); // TODO: It's not clear we actually want loadedState.diagnosticAttributes here. @@ -372,7 +371,12 @@ public void onConnectionSucceeded() throws // which is probably empty because this runs on the ChangeReceiver thread. try (var ___ = bosk.rootReference().diagnosticContext().withOnly(loadedState.diagnosticAttributes())) { downstream.submitReplacement(bosk.rootReference(), loadedState.state()); + LOGGER.debug("Done submitting downstream"); } + + // Now that the state is submitted downstream, we can establish that there's no need to wait + // for a change event with that revision number; a downstream flush is now sufficient. + newDriver.onRevisionToSkip(loadedState.revision()); } else { LOGGER.debug("Running initialRoot action"); runInitialRootAction(initialRootAction);