From d6989f1e54104d09b8af6d22cf46ea4f6fc5f4dc Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Wed, 10 Sep 2014 00:12:30 +0200 Subject: o Initial import of postgresql LISTEN/NOTIFY code. --- .../java/io/trygvis/jz14/db/NativeListener.java | 157 +++++++++++++++++++++ 1 file changed, 157 insertions(+) create mode 100644 src/main/java/io/trygvis/jz14/db/NativeListener.java (limited to 'src/main/java/io/trygvis/jz14/db/NativeListener.java') diff --git a/src/main/java/io/trygvis/jz14/db/NativeListener.java b/src/main/java/io/trygvis/jz14/db/NativeListener.java new file mode 100644 index 0000000..c44a7cc --- /dev/null +++ b/src/main/java/io/trygvis/jz14/db/NativeListener.java @@ -0,0 +1,157 @@ +package io.trygvis.jz14.db; + +import io.trygvis.jz14.db.DbListener.PostgresConnection; +import org.postgresql.PGConnection; +import org.postgresql.PGNotification; +import org.slf4j.Logger; + +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; + +import static io.trygvis.jz14.db.DbListener.DbListenerConfig; +import static io.trygvis.jz14.db.DbListener.NewItemCallback; +import static java.lang.System.currentTimeMillis; +import static java.lang.Thread.sleep; +import static org.slf4j.LoggerFactory.getLogger; + +class NativeListener implements Listener { + private final Logger log = getLogger(getClass()); + + private final AtomicBoolean shouldRun = new AtomicBoolean(true); + private final DbListenerConfig config; + private final Supplier> connectionSupplier; + private final NewItemCallback callback; + + private PostgresConnection c; + private long lastLiveCheck; + + public NativeListener(DbListenerConfig config, Supplier> connectionSupplier, NewItemCallback callback) { + this.config = config; + this.connectionSupplier = connectionSupplier; + this.callback = callback; + } + + public synchronized void close() { + shouldRun.set(false); + notifyAll(); + } + + @Override + public void run() { + while (shouldRun.get()) { + try { + doIt(); + } catch (Throwable e) { + log.error("doIt failed", e); + try { + sleep(config.listenerFailureSleepInterval); + } catch (InterruptedException ex2) { + if (!shouldRun.get()) { + log.error("Got interrupted.", ex2); + } + } + } + } + } + + private void doIt() throws Exception { + + if (c == null) { + log.debug("Connecting to database"); + + try { + c = connectAndListen(); + } catch (Throwable ex) { + onError("Unable to connect to database", ex); + return; + } + } + + long now = currentTimeMillis(); + + if (now - lastLiveCheck > config.listenerConnectionLiveCheckInterval) { + log.debug("Doing live check"); + try { + doLiveCheck(); + lastLiveCheck = now; + } catch (Exception ex) { + onError("Live check failed", ex); + return; + } + } + + PGNotification[] notifications; + try { + notifications = c.underlying.getNotifications(); + } catch (SQLException e) { + onError("Error while checking for notifications on connection.", e); + return; + } + + if (notifications != null) { + List strings = new ArrayList<>(); + for (PGNotification notification : notifications) { + String parameter = notification.getParameter(); + if (parameter == null) { + continue; + } + + parameter = parameter.trim(); + + if (!parameter.isEmpty()) { + strings.add(parameter); + } + } + + log.debug("Got notification: parameters={}", strings); + + try { + callback.newItem(true, strings); + } catch (Exception ex) { + onError("Notification handler failed", ex); + return; + } + } + + sleep(config.listenerSleepInterval); + } + + private void onError(String msg, Throwable ex) throws InterruptedException { + log.info(msg, ex); + + // Do a last attempt at closing the connection, the connection pool might appreciate it. + if (c != null) { + try { + c.close(); + } catch (Throwable e) { + log.info("Exception when closing connection after error", e); + } + } + c = null; + log.info("Sleeping for {} ms after failure", config.listenerFailureSleepInterval); + sleep(config.listenerFailureSleepInterval); + log.info("Resuming work after failure"); + } + + private PostgresConnection connectAndListen() throws SQLException { + PostgresConnection c = connectionSupplier.get(); + c.sqlConnection.setAutoCommit(true); + + lastLiveCheck = currentTimeMillis(); + + PreparedStatement s = c.sqlConnection.prepareStatement("LISTEN \"" + config.name + "\""); + s.execute(); + s.close(); + return c; + } + + private void doLiveCheck() throws SQLException { + PreparedStatement s = c.sqlConnection.prepareStatement("SELECT 1"); + s.executeQuery().close(); + s.close(); + } +} -- cgit v1.2.3