From d6989f1e54104d09b8af6d22cf46ea4f6fc5f4dc Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Wed, 10 Sep 2014 00:12:30 +0200 Subject: o Initial import of postgresql LISTEN/NOTIFY code. --- src/main/java/io/trygvis/jz14/db/NgListener.java | 126 +++++++++++++++++++++++ 1 file changed, 126 insertions(+) create mode 100644 src/main/java/io/trygvis/jz14/db/NgListener.java (limited to 'src/main/java/io/trygvis/jz14/db/NgListener.java') diff --git a/src/main/java/io/trygvis/jz14/db/NgListener.java b/src/main/java/io/trygvis/jz14/db/NgListener.java new file mode 100644 index 0000000..b875ea1 --- /dev/null +++ b/src/main/java/io/trygvis/jz14/db/NgListener.java @@ -0,0 +1,126 @@ +package io.trygvis.jz14.db; + +import com.impossibl.postgres.api.jdbc.PGConnection; +import com.impossibl.postgres.api.jdbc.PGNotificationListener; +import io.trygvis.jz14.db.DbListener.PostgresConnection; +import org.slf4j.Logger; + +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; + +import static io.trygvis.jz14.db.DbListener.DbListenerConfig; +import static io.trygvis.jz14.db.DbListener.NewItemCallback; +import static java.lang.Thread.sleep; +import static org.slf4j.LoggerFactory.getLogger; + +class NgListener implements Listener, PGNotificationListener { + private final Logger log = getLogger(getClass()); + + private final AtomicBoolean shouldRun = new AtomicBoolean(true); + private final DbListenerConfig config; + private final Supplier> connectionSupplier; + private final NewItemCallback callback; + + private PostgresConnection c; + + public NgListener(DbListenerConfig config, Supplier> connectionSupplier, NewItemCallback callback) { + this.config = config; + this.connectionSupplier = connectionSupplier; + this.callback = callback; + } + + public synchronized void close() { + shouldRun.set(false); + notifyAll(); + } + + @Override + public void run() { + while (shouldRun.get()) { + try { + doIt(); + } catch (Throwable e) { + log.error("doIt failed", e); + try { + sleep(config.listenerFailureSleepInterval); + } catch (InterruptedException ex2) { + if (!shouldRun.get()) { + log.error("Got interrupted.", ex2); + } + } + } + } + } + + private void doIt() throws Exception { + + if (c == null) { + log.debug("Connecting to database"); + + try { + c = connectAndListen(); + } catch (Throwable ex) { + onError("Unable to connect to database", ex); + return; + } + } + + log.debug("Doing live check"); + try { + doLiveCheck(); + } catch (Exception ex) { + onError("Live check failed", ex); + return; + } + + sleep(config.listenerConnectionLiveCheckInterval); + } + + private void onError(String msg, Throwable ex) throws InterruptedException { + log.info(msg, ex); + + // Do a last attempt at closing the connection, the connection pool might appreciate it. + if (c != null) { + try { + c.close(); + } catch (Throwable e) { + log.info("Exception when closing connection after error", e); + } + } + c = null; + log.info("Sleeping for {} ms after failure", config.listenerFailureSleepInterval); + sleep(config.listenerFailureSleepInterval); + log.info("Resuming work after failure"); + } + + private PostgresConnection connectAndListen() throws SQLException { + PostgresConnection c = connectionSupplier.get(); + c.sqlConnection.setAutoCommit(true); + + c.underlying.addNotificationListener(this); + + PreparedStatement s = c.sqlConnection.prepareStatement("LISTEN \"" + config.name + "\""); + s.execute(); + s.close(); + + return c; + } + + private void doLiveCheck() throws SQLException { + PreparedStatement s = c.sqlConnection.prepareStatement("SELECT 1"); + s.executeQuery().close(); + s.close(); + } + + @Override + public void notification(int processId, String channelName, String payload) { + try { + callback.newItem(true, Collections.singletonList(payload)); + } catch (Exception e) { + log.warn("Exception while processing callback, payload=" + payload, e); + } + } +} -- cgit v1.2.3