Skip to content

Commit

Permalink
Closer!
Browse files Browse the repository at this point in the history
  • Loading branch information
prdoyle committed Sep 28, 2024
1 parent 11740f5 commit 86b611b
Showing 1 changed file with 29 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
Expand Down Expand Up @@ -66,18 +67,18 @@ public class PostgresDriver implements BoskDriver {
/**
* The thread that does the Postgres LISTEN
*/
final ScheduledExecutorService listener = Executors.newScheduledThreadPool(1);
final ScheduledExecutorService listener;

private final AtomicLong lastChangeSubmittedDownstream = new AtomicLong(-1);

PostgresDriver(
ConnectionSource cs,
RootReference<?> rootRef,
BoskInfo<?> bosk,
ObjectMapper mapper,
BoskDriver downstream
) {
this.downstream = requireNonNull(downstream);
this.rootRef = requireNonNull(rootRef);
this.rootRef = requireNonNull(bosk.rootReference());
this.mapper = requireNonNull(mapper);
try {
connectionSource = cs;
Expand All @@ -91,6 +92,12 @@ public class PostgresDriver implements BoskDriver {
} catch (SQLException e) {
throw new IllegalStateException("Unable to access PGConnection", e);
}
listener = Executors.newScheduledThreadPool(1, r ->
new Thread(r, "Postgres listener \""
+ bosk.name()
+ "\" "
+ bosk.instanceID())
);
this.listener.scheduleWithFixedDelay(this::listenerLoop, 0, 5, SECONDS);
}

Expand All @@ -103,6 +110,7 @@ private void listenerLoop() {
while (isOpen.get()) {
PGNotification[] notifications = null;
try {
LOGGER.debug("Asking for notifications");
notifications = listenerConnection.unwrap(PGConnection.class).getNotifications();
} catch (PSQLException e) {
if (isOpen.get()) {
Expand All @@ -111,7 +119,10 @@ private void listenerLoop() {
continue;
}
}
if (notifications != null) {
if (notifications == null) {
LOGGER.debug("Notifications is null");
} else {
LOGGER.debug("Got {} notifications", notifications.length);
for (PGNotification n : notifications) {
processNotification(Notification.from(n));
}
Expand All @@ -123,6 +134,7 @@ private void listenerLoop() {
}

private void processNotification(Notification n) throws SQLException {
LOGGER.debug("Processing notification {}", n.parameter());
try (
var c = connectionSource.get();
var q = c.prepareStatement("SELECT ref, new_state, diagnostics, id FROM bosk_changes WHERE id = ?::int");
Expand All @@ -132,8 +144,14 @@ private void processNotification(Notification n) throws SQLException {
var ref = rs.getString(1);
var newState = rs.getString(2);
var diagnostics = rs.getString(3);
record Change(String ref, String newState, String diagnostics){}
LOGGER.debug("Received change notification: {}", new Change(ref, newState, diagnostics));
long changeID = rs.getLong(4);

if (LOGGER.isTraceEnabled()) {
record Change(String ref, String newState, String diagnostics){}
LOGGER.trace("Received change {}: {}", changeID, new Change(ref, newState, diagnostics));
} else {
LOGGER.debug("Received change {}: {}", changeID, ref);
}

MapValue<String> diagnosticAttributes;
if (diagnostics == null) {
Expand All @@ -157,7 +175,6 @@ record Change(String ref, String newState, String diagnostics){}
LOGGER.debug("Downstream submitReplacement({}, ...)", target);
downstream.submitReplacement(target, newValue);
}
long changeID = rs.getLong(4);
long prev = lastChangeSubmittedDownstream.getAndSet(changeID);
if (prev >= changeID) {
LOGGER.error("Change ID did not increase; expected {} > {}", changeID, prev);
Expand All @@ -179,7 +196,7 @@ public static <RR extends StateTreeNode> PostgresDriverFactory<RR> factory(
Function<BoskInfo<RR>, ObjectMapper> objectMapperFactory
) {
return (b, d) -> new PostgresDriver(
connectionSource, b.rootReference(), objectMapperFactory.apply(b), d
connectionSource, b, objectMapperFactory.apply(b), d
);
}

Expand Down Expand Up @@ -375,8 +392,10 @@ public <T> void submitConditionalDeletion(Reference<T> target, Reference<Identif

@Override
public void flush() throws IOException, InterruptedException {
try {
long currentChangeID = S.latestChangeID(connectionSource.get());
try (
var connection = connectionSource.get();
){
long currentChangeID = S.latestChangeID(connection);
// Wait for any background tasks to finish
background.submit(()->{}).get();
// Wait for any pending notifications
Expand Down

0 comments on commit 86b611b

Please sign in to comment.