summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2014-09-10 00:12:30 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2014-09-12 09:08:00 +0200
commitd6989f1e54104d09b8af6d22cf46ea4f6fc5f4dc (patch)
tree2e9cd83b8a9425ea184b72c12721d7e08ea87d52
downloadjavazone-2014-master.tar.gz
javazone-2014-master.tar.bz2
javazone-2014-master.tar.xz
javazone-2014-master.zip
o Initial import of postgresql LISTEN/NOTIFY code.HEADmaster
-rw-r--r--.gitignore3
-rw-r--r--LICENSE.txt21
-rw-r--r--init.sql37
-rw-r--r--pom.xml55
-rw-r--r--src/main/java/io/trygvis/jz14/db/DbListener.java223
-rw-r--r--src/main/java/io/trygvis/jz14/db/Listener.java6
-rw-r--r--src/main/java/io/trygvis/jz14/db/NativeConnectionSupplier.java44
-rw-r--r--src/main/java/io/trygvis/jz14/db/NativeListener.java157
-rw-r--r--src/main/java/io/trygvis/jz14/db/NgConnectionSupplier.java52
-rw-r--r--src/main/java/io/trygvis/jz14/db/NgListener.java126
-rw-r--r--src/main/java/io/trygvis/jz14/db/RobustTimerTask.java47
-rw-r--r--src/main/java/io/trygvis/jz14/demo/Db.java67
-rw-r--r--src/main/java/io/trygvis/jz14/demo/InserterMain.java37
-rw-r--r--src/main/java/io/trygvis/jz14/demo/KillListenerMain.java58
-rw-r--r--src/main/java/io/trygvis/jz14/demo/ListenerMain.java55
-rw-r--r--src/main/java/io/trygvis/jz14/demo/NativeListenerMain.java8
-rw-r--r--src/main/java/io/trygvis/jz14/demo/NgListenerMain.java7
-rw-r--r--src/main/java/io/trygvis/jz14/demo/NotifierMain.java34
-rw-r--r--src/main/resources/logback.xml15
19 files changed, 1052 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..f83e8cf
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,3 @@
+.idea
+target
+*.iml
diff --git a/LICENSE.txt b/LICENSE.txt
new file mode 100644
index 0000000..4dc9c3c
--- /dev/null
+++ b/LICENSE.txt
@@ -0,0 +1,21 @@
+The MIT License (MIT)
+
+Copyright (c) <2014> Trygve Laugstøl <trygvis@inamo.no>
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
diff --git a/init.sql b/init.sql
new file mode 100644
index 0000000..0ec0477
--- /dev/null
+++ b/init.sql
@@ -0,0 +1,37 @@
+-- Create a table for incoming mail
+
+CREATE TABLE mail_raw (
+ id SERIAL PRIMARY KEY,
+ raw TEXT
+);
+
+-- Insert a row into the table;
+
+INSERT INTO mail_raw (raw) VALUES ('my mail');
+NOTIFY mail_raw;
+
+--
+-- Alternate technique with implicit notifies through triggers
+--
+
+CREATE TABLE mail_raw_t (
+ id SERIAL PRIMARY KEY,
+ raw TEXT
+);
+
+-- This trigger function does the notification
+CREATE FUNCTION notify_trigger()
+ RETURNS TRIGGER AS $$
+BEGIN
+ PERFORM pg_notify(TG_TABLE_NAME, NEW.id :: TEXT);
+ RETURN NULL;
+END;
+$$ LANGUAGE plpgsql;
+
+-- Create a trigger on the mail_raw_t table and make it run on each new row
+CREATE TRIGGER mail_raw_t_notify
+AFTER INSERT ON mail_raw_t
+FOR EACH ROW
+EXECUTE PROCEDURE notify_trigger();
+
+INSERT INTO mail_raw_t (raw) VALUES ('my mail');
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..c0a5ad5
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,55 @@
+<project>
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>io.trygvis</groupId>
+ <artifactId>trygvis-parent</artifactId>
+ <version>1</version>
+ </parent>
+ <groupId>io.trygvis.javazone-2014</groupId>
+ <artifactId>javazone-2014</artifactId>
+ <version>1.0-SNAPSHOT</version>
+
+ <properties>
+ <maven.compiler.source>1.8</maven.compiler.source>
+ <maven.compiler.target>1.8</maven.compiler.target>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.postgresql</groupId>
+ <artifactId>postgresql</artifactId>
+ <version>9.3-1102-jdbc41</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.impossibl.pgjdbc-ng</groupId>
+ <artifactId>pgjdbc-ng</artifactId>
+ <version>0.4</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.7</version>
+ </dependency>
+
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>1.1.2</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>${maven.compiler.source}</source>
+ <target>${maven.compiler.target}</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/src/main/java/io/trygvis/jz14/db/DbListener.java b/src/main/java/io/trygvis/jz14/db/DbListener.java
new file mode 100644
index 0000000..f16f391
--- /dev/null
+++ b/src/main/java/io/trygvis/jz14/db/DbListener.java
@@ -0,0 +1,223 @@
+package io.trygvis.jz14.db;
+
+import org.slf4j.Logger;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Timer;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import static io.trygvis.jz14.db.RobustTimerTask.robustTimerTask;
+import static java.util.concurrent.TimeUnit.HOURS;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.slf4j.LoggerFactory.getLogger;
+
+public class DbListener implements Closeable {
+ private final Logger log = getLogger(getClass());
+
+ private final DbListenerConfig config;
+ private final Timer timer;
+
+ private final Listener listener;
+
+ @FunctionalInterface
+ public static interface NewItemCallback {
+ void newItem(boolean wasNotified, Iterable<String> parameters) throws Exception;
+ }
+
+ public static final class PostgresConnection<T> implements Closeable {
+ public final Connection sqlConnection;
+ public final T underlying;
+
+ public PostgresConnection(Connection sqlConnection, T underlying) {
+ this.sqlConnection = sqlConnection;
+ this.underlying = underlying;
+ }
+
+ @Override
+ public final void close() {
+ try {
+ sqlConnection.close();
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public abstract static class DbListenerConfig {
+
+ /**
+ * Default scanner delay; 1 minute;
+ */
+ public static final long DEFAULT_SCANNER_DELAY = ms(1, MINUTES);
+
+ /**
+ * Default scanner period; 1 hour.
+ */
+ public static final long DEFAULT_SCANNER_PERIOD = ms(1, HOURS);
+
+ /**
+ * Default connection life check interval; 1 hour;
+ */
+ public static final long DEFAULT_LISTENER_CONNECTION_LIVE_CHECK_INTERVAL = ms(1, HOURS);
+
+ /**
+ * Default sleep time after failure; 1 second;
+ */
+ public static final long DEFAULT_LISTENER_SLEEP_INTERVAL = ms(200, MILLISECONDS);
+
+ /**
+ * Default sleep time after failure; 1 minute;
+ */
+ public static final long DEFAULT_LISTENER_FAILURE_SLEEP_INTERVAL = ms(1, MINUTES);
+
+ /**
+ * The name of the channel to listen on.
+ */
+ public final String name;
+
+ /**
+ * Should the scanner thread be started?
+ */
+ public final boolean runScanner;
+
+ /**
+ * Should the listener thread be started?
+ */
+ public final boolean runListener;
+
+ /**
+ * How long should the scanner wait after start before starting to scan.
+ */
+ public final long scannerDelay;
+
+ /**
+ * How often should the scanner wait between scans.
+ */
+ public long scannerPeriod;
+
+ /**
+ * How long should the listener sleep after each poll.
+ */
+ public long listenerSleepInterval;
+
+ /**
+ * How often should the listener do a live check of the connection.
+ */
+ public long listenerConnectionLiveCheckInterval;
+
+ /**
+ * How long should the listener sleep after a failure.
+ */
+ public long listenerFailureSleepInterval;
+
+ /**
+ * Configures a listener with the default parameters,
+ */
+ public DbListenerConfig(String name) {
+ this(name, true, true, DEFAULT_SCANNER_DELAY, DEFAULT_SCANNER_PERIOD,
+ DEFAULT_LISTENER_SLEEP_INTERVAL, DEFAULT_LISTENER_CONNECTION_LIVE_CHECK_INTERVAL, DEFAULT_LISTENER_FAILURE_SLEEP_INTERVAL);
+ }
+
+ public DbListenerConfig(String name, boolean runScanner, boolean runListener, long scannerDelay, long scannerPeriod,
+ long listenerSleepInterval, long listenerConnectionLiveCheckInterval, long listenerFailureSleepInterval) {
+ this.name = name;
+ this.runScanner = runScanner;
+ this.runListener = runListener;
+ this.scannerDelay = scannerDelay;
+ this.scannerPeriod = scannerPeriod;
+ this.listenerSleepInterval = listenerSleepInterval;
+ this.listenerConnectionLiveCheckInterval = listenerConnectionLiveCheckInterval;
+ this.listenerFailureSleepInterval = listenerFailureSleepInterval;
+ }
+
+ public static long ms(long count, TimeUnit unit) {
+ return MILLISECONDS.convert(count, unit);
+ }
+ }
+
+ public static DbListener nativeDbListener(DbListenerConfig config, NewItemCallback callable,
+ Supplier<PostgresConnection<org.postgresql.PGConnection>> connectionSupplier) throws SQLException {
+ return new DbListener(config, callable, connectionSupplier, null);
+ }
+
+ public static DbListener ngDbListener(DbListenerConfig config, NewItemCallback callable,
+ Supplier<PostgresConnection<com.impossibl.postgres.api.jdbc.PGConnection>> connectionSupplier) throws SQLException {
+ return new DbListener(config, callable, null, connectionSupplier);
+ }
+
+ private DbListener(DbListenerConfig config, NewItemCallback callable,
+ Supplier<PostgresConnection<org.postgresql.PGConnection>> nativeSup,
+ Supplier<PostgresConnection<com.impossibl.postgres.api.jdbc.PGConnection>> ngSup) throws SQLException {
+ this.config = config;
+
+ log.info("DB Listener: {}", config.name);
+ log.info(" Run scanner: {}", config.runScanner);
+ log.info(" Run listener: {}", config.runListener);
+
+ if (config.runScanner) {
+ timer = new Timer("Timer \"" + config.name + "\"", true);
+
+ timer.schedule(robustTimerTask(new NewItemCallbackCallable<>(callable)), config.scannerDelay, config.scannerPeriod);
+ } else {
+ timer = null;
+ }
+
+ if (config.runListener) {
+ try (PostgresConnection<?> pg = (nativeSup != null ? nativeSup : ngSup).get()) {
+ Connection c = pg.sqlConnection;
+ if (c.getMetaData().getDatabaseProductName().toLowerCase().contains("postgresql")) {
+ if (nativeSup != null) {
+ listener = new NativeListener(config, nativeSup, callable);
+ } else {
+ listener = new NgListener(config, ngSup, callable);
+ }
+ Thread thread = new Thread(listener, "LISTEN \"" + config.name + "\"");
+ thread.start();
+ } else {
+ log.info("Mail listener is configured to run, but the database isn't a PostgreSQL database");
+ listener = null;
+ }
+ }
+ } else {
+ listener = null;
+ }
+ }
+
+ public void close() {
+ log.info("Stopping DB listener {}", config.name);
+ if (timer != null) {
+ timer.cancel();
+ }
+
+ if (listener != null) {
+ try {
+ listener.close();
+ } catch (IOException e) {
+ log.warn("Exception while closing listener.", e);
+ }
+ }
+ }
+
+ private class NewItemCallbackCallable<A> implements Callable<A> {
+
+ private final NewItemCallback callback;
+
+ private NewItemCallbackCallable(NewItemCallback callback) {
+ this.callback = callback;
+ }
+
+ @Override
+ public A call() throws Exception {
+ callback.newItem(false, Collections.emptyList());
+
+ return null;
+ }
+ }
+}
diff --git a/src/main/java/io/trygvis/jz14/db/Listener.java b/src/main/java/io/trygvis/jz14/db/Listener.java
new file mode 100644
index 0000000..4e8c7c1
--- /dev/null
+++ b/src/main/java/io/trygvis/jz14/db/Listener.java
@@ -0,0 +1,6 @@
+package io.trygvis.jz14.db;
+
+import java.io.Closeable;
+
+public interface Listener extends Runnable, Closeable {
+}
diff --git a/src/main/java/io/trygvis/jz14/db/NativeConnectionSupplier.java b/src/main/java/io/trygvis/jz14/db/NativeConnectionSupplier.java
new file mode 100644
index 0000000..d9db909
--- /dev/null
+++ b/src/main/java/io/trygvis/jz14/db/NativeConnectionSupplier.java
@@ -0,0 +1,44 @@
+package io.trygvis.jz14.db;
+
+import io.trygvis.jz14.db.DbListener.PostgresConnection;
+import org.postgresql.PGConnection;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.function.Supplier;
+
+public class NativeConnectionSupplier implements Supplier<PostgresConnection<PGConnection>> {
+ private final DataSource dataSource;
+
+ public NativeConnectionSupplier(DataSource dataSource) {
+ this.dataSource = dataSource;
+ }
+
+ @Override
+ public PostgresConnection<PGConnection> get() {
+ try {
+ Connection sqlConnection = dataSource.getConnection();
+ PGConnection pgConnection = unwrap(sqlConnection);
+ return new PostgresConnection<>(sqlConnection, pgConnection);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static PGConnection unwrap(Connection c) {
+ if (c instanceof PGConnection) {
+ return (PGConnection) c;
+ }
+
+ /* If you're using Spring, you need to add these two to properly unwrap the underlying PGConnection.
+ if (c instanceof ConnectionHandle) {
+ return unwrap(((ConnectionHandle) c).getInternalConnection());
+ }
+ if (c instanceof ConnectionProxy) {
+ return unwrap(DataSourceUtils.getTargetConnection(c));
+ }
+ */
+ throw new RuntimeException("Could not unwrap connection to a PGConnection: " + c.getClass());
+ }
+}
diff --git a/src/main/java/io/trygvis/jz14/db/NativeListener.java b/src/main/java/io/trygvis/jz14/db/NativeListener.java
new file mode 100644
index 0000000..c44a7cc
--- /dev/null
+++ b/src/main/java/io/trygvis/jz14/db/NativeListener.java
@@ -0,0 +1,157 @@
+package io.trygvis.jz14.db;
+
+import io.trygvis.jz14.db.DbListener.PostgresConnection;
+import org.postgresql.PGConnection;
+import org.postgresql.PGNotification;
+import org.slf4j.Logger;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+
+import static io.trygvis.jz14.db.DbListener.DbListenerConfig;
+import static io.trygvis.jz14.db.DbListener.NewItemCallback;
+import static java.lang.System.currentTimeMillis;
+import static java.lang.Thread.sleep;
+import static org.slf4j.LoggerFactory.getLogger;
+
+class NativeListener implements Listener {
+ private final Logger log = getLogger(getClass());
+
+ private final AtomicBoolean shouldRun = new AtomicBoolean(true);
+ private final DbListenerConfig config;
+ private final Supplier<PostgresConnection<PGConnection>> connectionSupplier;
+ private final NewItemCallback callback;
+
+ private PostgresConnection<PGConnection> c;
+ private long lastLiveCheck;
+
+ public NativeListener(DbListenerConfig config, Supplier<PostgresConnection<PGConnection>> connectionSupplier, NewItemCallback callback) {
+ this.config = config;
+ this.connectionSupplier = connectionSupplier;
+ this.callback = callback;
+ }
+
+ public synchronized void close() {
+ shouldRun.set(false);
+ notifyAll();
+ }
+
+ @Override
+ public void run() {
+ while (shouldRun.get()) {
+ try {
+ doIt();
+ } catch (Throwable e) {
+ log.error("doIt failed", e);
+ try {
+ sleep(config.listenerFailureSleepInterval);
+ } catch (InterruptedException ex2) {
+ if (!shouldRun.get()) {
+ log.error("Got interrupted.", ex2);
+ }
+ }
+ }
+ }
+ }
+
+ private void doIt() throws Exception {
+
+ if (c == null) {
+ log.debug("Connecting to database");
+
+ try {
+ c = connectAndListen();
+ } catch (Throwable ex) {
+ onError("Unable to connect to database", ex);
+ return;
+ }
+ }
+
+ long now = currentTimeMillis();
+
+ if (now - lastLiveCheck > config.listenerConnectionLiveCheckInterval) {
+ log.debug("Doing live check");
+ try {
+ doLiveCheck();
+ lastLiveCheck = now;
+ } catch (Exception ex) {
+ onError("Live check failed", ex);
+ return;
+ }
+ }
+
+ PGNotification[] notifications;
+ try {
+ notifications = c.underlying.getNotifications();
+ } catch (SQLException e) {
+ onError("Error while checking for notifications on connection.", e);
+ return;
+ }
+
+ if (notifications != null) {
+ List<String> strings = new ArrayList<>();
+ for (PGNotification notification : notifications) {
+ String parameter = notification.getParameter();
+ if (parameter == null) {
+ continue;
+ }
+
+ parameter = parameter.trim();
+
+ if (!parameter.isEmpty()) {
+ strings.add(parameter);
+ }
+ }
+
+ log.debug("Got notification: parameters={}", strings);
+
+ try {
+ callback.newItem(true, strings);
+ } catch (Exception ex) {
+ onError("Notification handler failed", ex);
+ return;
+ }
+ }
+
+ sleep(config.listenerSleepInterval);
+ }
+
+ private void onError(String msg, Throwable ex) throws InterruptedException {
+ log.info(msg, ex);
+
+ // Do a last attempt at closing the connection, the connection pool might appreciate it.
+ if (c != null) {
+ try {
+ c.close();
+ } catch (Throwable e) {
+ log.info("Exception when closing connection after error", e);
+ }
+ }
+ c = null;
+ log.info("Sleeping for {} ms after failure", config.listenerFailureSleepInterval);
+ sleep(config.listenerFailureSleepInterval);
+ log.info("Resuming work after failure");
+ }
+
+ private PostgresConnection<PGConnection> connectAndListen() throws SQLException {
+ PostgresConnection<PGConnection> c = connectionSupplier.get();
+ c.sqlConnection.setAutoCommit(true);
+
+ lastLiveCheck = currentTimeMillis();
+
+ PreparedStatement s = c.sqlConnection.prepareStatement("LISTEN \"" + config.name + "\"");
+ s.execute();
+ s.close();
+ return c;
+ }
+
+ private void doLiveCheck() throws SQLException {
+ PreparedStatement s = c.sqlConnection.prepareStatement("SELECT 1");
+ s.executeQuery().close();
+ s.close();
+ }
+}
diff --git a/src/main/java/io/trygvis/jz14/db/NgConnectionSupplier.java b/src/main/java/io/trygvis/jz14/db/NgConnectionSupplier.java
new file mode 100644
index 0000000..4006f1d
--- /dev/null
+++ b/src/main/java/io/trygvis/jz14/db/NgConnectionSupplier.java
@@ -0,0 +1,52 @@
+package io.trygvis.jz14.db;
+
+import com.impossibl.postgres.api.jdbc.PGConnection;
+import io.trygvis.jz14.db.DbListener.PostgresConnection;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.function.Supplier;
+
+public class NgConnectionSupplier implements Supplier<PostgresConnection<PGConnection>> {
+ private final DataSource dataSource;
+
+ public NgConnectionSupplier(DataSource dataSource) {
+ this.dataSource = dataSource;
+ }
+
+ @Override
+ public PostgresConnection<PGConnection> get() {
+ try {
+ Connection sqlConnection = dataSource.getConnection();
+ PGConnection pgConnection = unwrap(sqlConnection);
+ return new PostgresConnection<>(sqlConnection, pgConnection);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static PGConnection unwrap(Connection c) {
+ if (c instanceof PGConnection) {
+ return (PGConnection) c;
+ }
+
+ /* If you're using Spring, you need to add these two to properly unwrap the underlying PGConnection.
+ if (c instanceof ConnectionHandle) {
+ return unwrap(((ConnectionHandle) c).getInternalConnection());
+ }
+ if (c instanceof ConnectionProxy) {
+ return unwrap(DataSourceUtils.getTargetConnection(c));
+ }
+ */
+ Class<? extends Connection> klass = c.getClass();
+
+ Class<?>[] interfaces = klass.getInterfaces();
+ System.out.println("interfaces.length = " + interfaces.length);
+ for (Class<?> anInterface : interfaces) {
+ System.out.println("anInterface = " + anInterface);
+ }
+
+ throw new RuntimeException("Could not unwrap connection to a PGConnection: " + c.getClass());
+ }
+}
diff --git a/src/main/java/io/trygvis/jz14/db/NgListener.java b/src/main/java/io/trygvis/jz14/db/NgListener.java
new file mode 100644
index 0000000..b875ea1
--- /dev/null
+++ b/src/main/java/io/trygvis/jz14/db/NgListener.java
@@ -0,0 +1,126 @@
+package io.trygvis.jz14.db;
+
+import com.impossibl.postgres.api.jdbc.PGConnection;
+import com.impossibl.postgres.api.jdbc.PGNotificationListener;
+import io.trygvis.jz14.db.DbListener.PostgresConnection;
+import org.slf4j.Logger;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+
+import static io.trygvis.jz14.db.DbListener.DbListenerConfig;
+import static io.trygvis.jz14.db.DbListener.NewItemCallback;
+import static java.lang.Thread.sleep;
+import static org.slf4j.LoggerFactory.getLogger;
+
+class NgListener implements Listener, PGNotificationListener {
+ private final Logger log = getLogger(getClass());
+
+ private final AtomicBoolean shouldRun = new AtomicBoolean(true);
+ private final DbListenerConfig config;
+ private final Supplier<PostgresConnection<PGConnection>> connectionSupplier;
+ private final NewItemCallback callback;
+
+ private PostgresConnection<PGConnection> c;
+
+ public NgListener(DbListenerConfig config, Supplier<PostgresConnection<PGConnection>> connectionSupplier, NewItemCallback callback) {
+ this.config = config;
+ this.connectionSupplier = connectionSupplier;
+ this.callback = callback;
+ }
+
+ public synchronized void close() {
+ shouldRun.set(false);
+ notifyAll();
+ }
+
+ @Override
+ public void run() {
+ while (shouldRun.get()) {
+ try {
+ doIt();
+ } catch (Throwable e) {
+ log.error("doIt failed", e);
+ try {
+ sleep(config.listenerFailureSleepInterval);
+ } catch (InterruptedException ex2) {
+ if (!shouldRun.get()) {
+ log.error("Got interrupted.", ex2);
+ }
+ }
+ }
+ }
+ }
+
+ private void doIt() throws Exception {
+
+ if (c == null) {
+ log.debug("Connecting to database");
+
+ try {
+ c = connectAndListen();
+ } catch (Throwable ex) {
+ onError("Unable to connect to database", ex);
+ return;
+ }
+ }
+
+ log.debug("Doing live check");
+ try {
+ doLiveCheck();
+ } catch (Exception ex) {
+ onError("Live check failed", ex);
+ return;
+ }
+
+ sleep(config.listenerConnectionLiveCheckInterval);
+ }
+
+ private void onError(String msg, Throwable ex) throws InterruptedException {
+ log.info(msg, ex);
+
+ // Do a last attempt at closing the connection, the connection pool might appreciate it.
+ if (c != null) {
+ try {
+ c.close();
+ } catch (Throwable e) {
+ log.info("Exception when closing connection after error", e);
+ }
+ }
+ c = null;
+ log.info("Sleeping for {} ms after failure", config.listenerFailureSleepInterval);
+ sleep(config.listenerFailureSleepInterval);
+ log.info("Resuming work after failure");
+ }
+
+ private PostgresConnection<PGConnection> connectAndListen() throws SQLException {
+ PostgresConnection<PGConnection> c = connectionSupplier.get();
+ c.sqlConnection.setAutoCommit(true);
+
+ c.underlying.addNotificationListener(this);
+
+ PreparedStatement s = c.sqlConnection.prepareStatement("LISTEN \"" + config.name + "\"");
+ s.execute();
+ s.close();
+
+ return c;
+ }
+
+ private void doLiveCheck() throws SQLException {
+ PreparedStatement s = c.sqlConnection.prepareStatement("SELECT 1");
+ s.executeQuery().close();
+ s.close();
+ }
+
+ @Override
+ public void notification(int processId, String channelName, String payload) {
+ try {
+ callback.newItem(true, Collections.singletonList(payload));
+ } catch (Exception e) {
+ log.warn("Exception while processing callback, payload=" + payload, e);
+ }
+ }
+}
diff --git a/src/main/java/io/trygvis/jz14/db/RobustTimerTask.java b/src/main/java/io/trygvis/jz14/db/RobustTimerTask.java
new file mode 100644
index 0000000..af55aa7
--- /dev/null
+++ b/src/main/java/io/trygvis/jz14/db/RobustTimerTask.java
@@ -0,0 +1,47 @@
+package io.trygvis.jz14.db;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.TimerTask;
+import java.util.concurrent.Callable;
+
+public abstract class RobustTimerTask extends TimerTask {
+ private static final Logger log = LoggerFactory.getLogger(RobustTimerTask.class);
+
+ private final Class<?> klass;
+
+ private RobustTimerTask(Class<?> klass) {
+ this.klass = klass;
+ }
+
+ protected RobustTimerTask() {
+ this.klass = getClass();
+ }
+
+ public static TimerTask robustTimerTask(final TimerTask timerTask) {
+ return new RobustTimerTask(timerTask.getClass()) {
+ public void timerRun() throws Exception {
+ timerTask.run();
+ }
+ };
+ }
+
+ public static <T> TimerTask robustTimerTask(final Callable<T> callable) {
+ return new RobustTimerTask(callable.getClass()) {
+ public void timerRun() throws Exception {
+ callable.call();
+ }
+ };
+ }
+
+ public void run() {
+ try {
+ timerRun();
+ } catch (Exception e) {
+ log.error("Timer task " + klass.getName() + " failed.", e);
+ }
+ }
+
+ public abstract void timerRun() throws Exception;
+}
diff --git a/src/main/java/io/trygvis/jz14/demo/Db.java b/src/main/java/io/trygvis/jz14/demo/Db.java
new file mode 100644
index 0000000..9de8d7d
--- /dev/null
+++ b/src/main/java/io/trygvis/jz14/demo/Db.java
@@ -0,0 +1,67 @@
+package io.trygvis.jz14.demo;
+
+import com.impossibl.postgres.jdbc.PGDataSource;
+import org.postgresql.ds.PGPoolingDataSource;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+import static java.util.Optional.ofNullable;
+
+public class Db {
+
+ public final String applicationName;
+ public final boolean ng;
+
+ public Db(String applicationName) {
+ this.applicationName = applicationName;
+ ng = false;
+ }
+
+ public Db(String applicationName, boolean ng) {
+ this.applicationName = applicationName;
+ this.ng = ng;
+ }
+
+ public Connection getConnection() throws SQLException {
+ return DriverManager.getConnection(jdbcUrl(), jdbcUsername(), jdbcPassword());
+ }
+
+ private String jdbcPassword() {
+ return ofNullable(System.getenv("password")).orElseGet(() -> System.getProperty("user.name"));
+ }
+
+ private String jdbcUsername() {
+ return ofNullable(System.getenv("username")).orElseGet(() -> System.getProperty("user.name"));
+ }
+
+ private String jdbcDatabase() {
+ return ofNullable(System.getenv("database")).orElseGet(() -> System.getProperty("user.name"));
+ }
+
+ private String jdbcUrl() {
+ if (ng) {
+ return "jdbc:pgsql://localhost/" + System.getProperty("user.name") + "?ApplicationName=" + applicationName;
+ }
+ return "jdbc:postgresql://localhost/" + System.getProperty("user.name") + "?ApplicationName=" + applicationName;
+ }
+
+ public DataSource dataSource() throws SQLException {
+ if (!ng) {
+ PGPoolingDataSource ds = new PGPoolingDataSource();
+ ds.setUrl(jdbcUrl());
+ ds.setUser(jdbcUsername());
+ ds.setPassword(jdbcPassword());
+ return ds;
+ } else {
+ PGDataSource ds = new PGDataSource();
+ ds.setHost("localhost");
+ ds.setUser(jdbcUsername());
+ ds.setPassword(jdbcPassword());
+ ds.setDatabase(jdbcDatabase());
+ return ds;
+ }
+ }
+}
diff --git a/src/main/java/io/trygvis/jz14/demo/InserterMain.java b/src/main/java/io/trygvis/jz14/demo/InserterMain.java
new file mode 100644
index 0000000..1192a09
--- /dev/null
+++ b/src/main/java/io/trygvis/jz14/demo/InserterMain.java
@@ -0,0 +1,37 @@
+package io.trygvis.jz14.demo;
+
+import org.slf4j.Logger;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.util.Random;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+public class InserterMain {
+ Logger log = getLogger(getClass());
+ Db db = new Db("inserter");
+ Random r = new Random();
+
+ public static void main(String[] args) throws Exception {
+ new InserterMain().main();
+ }
+
+ public void main() throws Exception {
+ Connection c = db.getConnection();
+ c.setAutoCommit(false);
+
+ PreparedStatement stmt = c.prepareStatement("INSERT INTO mail_raw_t(raw) VALUES(?)");
+ int count = 1 + r.nextInt(9);
+
+ for (int i = 0; i < count; i++) {
+ stmt.setString(1, "mail #" + i);
+ stmt.execute();
+ }
+ c.commit();
+
+ log.info("INSERT performed, count={}", count);
+
+ c.close();
+ }
+}
diff --git a/src/main/java/io/trygvis/jz14/demo/KillListenerMain.java b/src/main/java/io/trygvis/jz14/demo/KillListenerMain.java
new file mode 100644
index 0000000..6ea982a
--- /dev/null
+++ b/src/main/java/io/trygvis/jz14/demo/KillListenerMain.java
@@ -0,0 +1,58 @@
+package io.trygvis.jz14.demo;
+
+import io.trygvis.jz14.db.DbListener;
+import org.slf4j.Logger;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Kills all connections from {@link io.trygvis.jz14.demo.ListenerMain}.
+ * <p/>
+ * Note that this is slightly different from a postgresql backend failing, as the client will be told that it was
+ * killed, but the effect in this code is the same.
+ */
+public class KillListenerMain implements DbListener.NewItemCallback {
+ Logger log = getLogger(getClass());
+ Db db = new Db("killer");
+
+ public static void main(String[] args) throws Exception {
+ new KillListenerMain().main();
+ }
+
+ private void main() throws Exception {
+ Connection c = db.getConnection();
+ Statement s = c.createStatement();
+
+ ResultSet rs = s.executeQuery("SELECT pid FROM pg_stat_activity WHERE usename=user AND application_name='listener'");
+ if (!rs.next()) {
+ System.out.println("Couldn't find any listener");
+ } else {
+ List<Integer> pids = new ArrayList<>();
+
+ do {
+ int pid = rs.getInt("pid");
+ pids.add(pid);
+ } while (rs.next());
+
+ for (Integer pid : pids) {
+ System.out.println("Killing " + pid);
+ ResultSet rs2 = s.executeQuery("select pg_terminate_backend(" + pid + ")");
+ rs2.next();
+ System.out.println("rs2.getBoolean(1) = " + rs2.getBoolean(1));
+ }
+ }
+
+ c.close();
+ }
+
+ @Override
+ public void newItem(boolean wasNotified, Iterable<String> parameters) throws Exception {
+ log.info("ListenerMain.newItem: wasNotified={}, parameters={}", wasNotified, parameters);
+ }
+}
diff --git a/src/main/java/io/trygvis/jz14/demo/ListenerMain.java b/src/main/java/io/trygvis/jz14/demo/ListenerMain.java
new file mode 100644
index 0000000..82f7c27
--- /dev/null
+++ b/src/main/java/io/trygvis/jz14/demo/ListenerMain.java
@@ -0,0 +1,55 @@
+package io.trygvis.jz14.demo;
+
+import io.trygvis.jz14.db.DbListener;
+import io.trygvis.jz14.db.DbListener.DbListenerConfig;
+import io.trygvis.jz14.db.NativeConnectionSupplier;
+import io.trygvis.jz14.db.NgConnectionSupplier;
+import org.slf4j.Logger;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.slf4j.LoggerFactory.getLogger;
+
+public class ListenerMain implements DbListener.NewItemCallback {
+ Logger log = getLogger(getClass());
+
+ public void main(boolean useNative) throws Exception {
+ DbListener listener;
+ if (useNative) {
+ Db db = new Db("listener");
+ listener = DbListener.nativeDbListener(
+ new DbListenerConfigForListenerMain("mail_raw"),
+ this, new NativeConnectionSupplier(db.dataSource())
+ );
+ } else {
+ Db db = new Db("listener", true);
+ listener = DbListener.ngDbListener(
+ new DbListenerConfigForListenerMain("mail_raw"),
+ this, new NgConnectionSupplier(db.dataSource())
+ );
+ }
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ listener.close();
+ }
+ });
+
+ while (true) {
+ Thread.sleep(Long.MAX_VALUE);
+ }
+ }
+
+ @Override
+ public void newItem(boolean wasNotified, Iterable<String> parameters) throws Exception {
+ log.info("ListenerMain.newItem: wasNotified={}, parameters={}", wasNotified, parameters);
+ }
+
+ private class DbListenerConfigForListenerMain extends DbListenerConfig {
+ public DbListenerConfigForListenerMain(String name) {
+ super(name);
+ listenerFailureSleepInterval = ms(5, SECONDS);
+ listenerConnectionLiveCheckInterval = ms(5, SECONDS);
+ }
+ }
+}
diff --git a/src/main/java/io/trygvis/jz14/demo/NativeListenerMain.java b/src/main/java/io/trygvis/jz14/demo/NativeListenerMain.java
new file mode 100644
index 0000000..03f1c51
--- /dev/null
+++ b/src/main/java/io/trygvis/jz14/demo/NativeListenerMain.java
@@ -0,0 +1,8 @@
+package io.trygvis.jz14.demo;
+
+public class NativeListenerMain {
+ public static void main(String[] args) throws Exception {
+ new ListenerMain().main(true);
+ }
+}
+
diff --git a/src/main/java/io/trygvis/jz14/demo/NgListenerMain.java b/src/main/java/io/trygvis/jz14/demo/NgListenerMain.java
new file mode 100644
index 0000000..0ea79df
--- /dev/null
+++ b/src/main/java/io/trygvis/jz14/demo/NgListenerMain.java
@@ -0,0 +1,7 @@
+package io.trygvis.jz14.demo;
+
+public class NgListenerMain {
+ public static void main(String[] args) throws Exception {
+ new ListenerMain().main(false);
+ }
+}
diff --git a/src/main/java/io/trygvis/jz14/demo/NotifierMain.java b/src/main/java/io/trygvis/jz14/demo/NotifierMain.java
new file mode 100644
index 0000000..12dad41
--- /dev/null
+++ b/src/main/java/io/trygvis/jz14/demo/NotifierMain.java
@@ -0,0 +1,34 @@
+package io.trygvis.jz14.demo;
+
+import org.slf4j.Logger;
+
+import java.sql.Connection;
+import java.util.Random;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+public class NotifierMain {
+ Logger log = getLogger(getClass());
+ Db db = new Db("notifier");
+ Random r = new Random();
+
+ public static void main(String[] args) throws Exception {
+ new NotifierMain().main();
+ }
+
+ public void main() throws Exception {
+ Connection c = db.getConnection();
+// c.setAutoCommit(false);
+
+ int count = 1 + r.nextInt(9);
+
+ for (int i = 0; i < count; i++) {
+ c.createStatement().execute("NOTIFY mail_raw, '" + i + "';");
+ }
+// c.commit();
+
+ log.info("NOTIFY performed, count={}", count);
+
+ c.close();
+ }
+}
diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml
new file mode 100644
index 0000000..31e60d0
--- /dev/null
+++ b/src/main/resources/logback.xml
@@ -0,0 +1,15 @@
+<configuration debug="false" scan="true" scanPeriod="1 second">
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} %-5level [%-17thread] %-20logger{0} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <logger name="com.googlecode.flyway.core.dbsupport.SqlScript" level="INFO"/>
+ <logger name="org.eclipse.jetty" level="INFO"/>
+
+ <root level="DEBUG">
+ <appender-ref ref="STDOUT"/>
+ </root>
+</configuration>