From 86b611ba66f5345fa359983684b4932da6d58116 Mon Sep 17 00:00:00 2001
From: Patrick Doyle
Date: Sat, 28 Sep 2024 13:47:13 -0400
Subject: [PATCH] Closer!
---
.../drivers/postgresql/PostgresDriver.java | 39 ++++++++++++++-----
1 file changed, 29 insertions(+), 10 deletions(-)
diff --git a/bosk-postgres/src/main/java/works/bosk/drivers/postgresql/PostgresDriver.java b/bosk-postgres/src/main/java/works/bosk/drivers/postgresql/PostgresDriver.java
index cd02e0ad..e5636f05 100644
--- a/bosk-postgres/src/main/java/works/bosk/drivers/postgresql/PostgresDriver.java
+++ b/bosk-postgres/src/main/java/works/bosk/drivers/postgresql/PostgresDriver.java
@@ -16,6 +16,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
@@ -66,18 +67,18 @@ public class PostgresDriver implements BoskDriver {
/**
* The thread that does the Postgres LISTEN
*/
- final ScheduledExecutorService listener = Executors.newScheduledThreadPool(1);
+ final ScheduledExecutorService listener;
private final AtomicLong lastChangeSubmittedDownstream = new AtomicLong(-1);
PostgresDriver(
ConnectionSource cs,
- RootReference> rootRef,
+ BoskInfo> bosk,
ObjectMapper mapper,
BoskDriver downstream
) {
this.downstream = requireNonNull(downstream);
- this.rootRef = requireNonNull(rootRef);
+ this.rootRef = requireNonNull(bosk.rootReference());
this.mapper = requireNonNull(mapper);
try {
connectionSource = cs;
@@ -91,6 +92,12 @@ public class PostgresDriver implements BoskDriver {
} catch (SQLException e) {
throw new IllegalStateException("Unable to access PGConnection", e);
}
+ listener = Executors.newScheduledThreadPool(1, r ->
+ new Thread(r, "Postgres listener \""
+ + bosk.name()
+ + "\" "
+ + bosk.instanceID())
+ );
this.listener.scheduleWithFixedDelay(this::listenerLoop, 0, 5, SECONDS);
}
@@ -103,6 +110,7 @@ private void listenerLoop() {
while (isOpen.get()) {
PGNotification[] notifications = null;
try {
+ LOGGER.debug("Asking for notifications");
notifications = listenerConnection.unwrap(PGConnection.class).getNotifications();
} catch (PSQLException e) {
if (isOpen.get()) {
@@ -111,7 +119,10 @@ private void listenerLoop() {
continue;
}
}
- if (notifications != null) {
+ if (notifications == null) {
+ LOGGER.debug("Notifications is null");
+ } else {
+ LOGGER.debug("Got {} notifications", notifications.length);
for (PGNotification n : notifications) {
processNotification(Notification.from(n));
}
@@ -123,6 +134,7 @@ private void listenerLoop() {
}
private void processNotification(Notification n) throws SQLException {
+ LOGGER.debug("Processing notification {}", n.parameter());
try (
var c = connectionSource.get();
var q = c.prepareStatement("SELECT ref, new_state, diagnostics, id FROM bosk_changes WHERE id = ?::int");
@@ -132,8 +144,14 @@ private void processNotification(Notification n) throws SQLException {
var ref = rs.getString(1);
var newState = rs.getString(2);
var diagnostics = rs.getString(3);
- record Change(String ref, String newState, String diagnostics){}
- LOGGER.debug("Received change notification: {}", new Change(ref, newState, diagnostics));
+ long changeID = rs.getLong(4);
+
+ if (LOGGER.isTraceEnabled()) {
+ record Change(String ref, String newState, String diagnostics){}
+ LOGGER.trace("Received change {}: {}", changeID, new Change(ref, newState, diagnostics));
+ } else {
+ LOGGER.debug("Received change {}: {}", changeID, ref);
+ }
MapValue diagnosticAttributes;
if (diagnostics == null) {
@@ -157,7 +175,6 @@ record Change(String ref, String newState, String diagnostics){}
LOGGER.debug("Downstream submitReplacement({}, ...)", target);
downstream.submitReplacement(target, newValue);
}
- long changeID = rs.getLong(4);
long prev = lastChangeSubmittedDownstream.getAndSet(changeID);
if (prev >= changeID) {
LOGGER.error("Change ID did not increase; expected {} > {}", changeID, prev);
@@ -179,7 +196,7 @@ public static PostgresDriverFactory factory(
Function, ObjectMapper> objectMapperFactory
) {
return (b, d) -> new PostgresDriver(
- connectionSource, b.rootReference(), objectMapperFactory.apply(b), d
+ connectionSource, b, objectMapperFactory.apply(b), d
);
}
@@ -375,8 +392,10 @@ public void submitConditionalDeletion(Reference target, Reference{}).get();
// Wait for any pending notifications