diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2014-09-10 00:12:30 +0200 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2014-09-12 09:08:00 +0200 |
commit | d6989f1e54104d09b8af6d22cf46ea4f6fc5f4dc (patch) | |
tree | 2e9cd83b8a9425ea184b72c12721d7e08ea87d52 /src/main/java/io/trygvis/jz14/db | |
download | javazone-2014-d6989f1e54104d09b8af6d22cf46ea4f6fc5f4dc.tar.gz javazone-2014-d6989f1e54104d09b8af6d22cf46ea4f6fc5f4dc.tar.bz2 javazone-2014-d6989f1e54104d09b8af6d22cf46ea4f6fc5f4dc.tar.xz javazone-2014-d6989f1e54104d09b8af6d22cf46ea4f6fc5f4dc.zip |
Diffstat (limited to 'src/main/java/io/trygvis/jz14/db')
-rw-r--r-- | src/main/java/io/trygvis/jz14/db/DbListener.java | 223 | ||||
-rw-r--r-- | src/main/java/io/trygvis/jz14/db/Listener.java | 6 | ||||
-rw-r--r-- | src/main/java/io/trygvis/jz14/db/NativeConnectionSupplier.java | 44 | ||||
-rw-r--r-- | src/main/java/io/trygvis/jz14/db/NativeListener.java | 157 | ||||
-rw-r--r-- | src/main/java/io/trygvis/jz14/db/NgConnectionSupplier.java | 52 | ||||
-rw-r--r-- | src/main/java/io/trygvis/jz14/db/NgListener.java | 126 | ||||
-rw-r--r-- | src/main/java/io/trygvis/jz14/db/RobustTimerTask.java | 47 |
7 files changed, 655 insertions, 0 deletions
diff --git a/src/main/java/io/trygvis/jz14/db/DbListener.java b/src/main/java/io/trygvis/jz14/db/DbListener.java new file mode 100644 index 0000000..f16f391 --- /dev/null +++ b/src/main/java/io/trygvis/jz14/db/DbListener.java @@ -0,0 +1,223 @@ +package io.trygvis.jz14.db; + +import org.slf4j.Logger; + +import java.io.Closeable; +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Collections; +import java.util.Timer; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import static io.trygvis.jz14.db.RobustTimerTask.robustTimerTask; +import static java.util.concurrent.TimeUnit.HOURS; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.slf4j.LoggerFactory.getLogger; + +public class DbListener implements Closeable { + private final Logger log = getLogger(getClass()); + + private final DbListenerConfig config; + private final Timer timer; + + private final Listener listener; + + @FunctionalInterface + public static interface NewItemCallback { + void newItem(boolean wasNotified, Iterable<String> parameters) throws Exception; + } + + public static final class PostgresConnection<T> implements Closeable { + public final Connection sqlConnection; + public final T underlying; + + public PostgresConnection(Connection sqlConnection, T underlying) { + this.sqlConnection = sqlConnection; + this.underlying = underlying; + } + + @Override + public final void close() { + try { + sqlConnection.close(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + } + + public abstract static class DbListenerConfig { + + /** + * Default scanner delay; 1 minute; + */ + public static final long DEFAULT_SCANNER_DELAY = ms(1, MINUTES); + + /** + * Default scanner period; 1 hour. + */ + public static final long DEFAULT_SCANNER_PERIOD = ms(1, HOURS); + + /** + * Default connection life check interval; 1 hour; + */ + public static final long DEFAULT_LISTENER_CONNECTION_LIVE_CHECK_INTERVAL = ms(1, HOURS); + + /** + * Default sleep time after failure; 1 second; + */ + public static final long DEFAULT_LISTENER_SLEEP_INTERVAL = ms(200, MILLISECONDS); + + /** + * Default sleep time after failure; 1 minute; + */ + public static final long DEFAULT_LISTENER_FAILURE_SLEEP_INTERVAL = ms(1, MINUTES); + + /** + * The name of the channel to listen on. + */ + public final String name; + + /** + * Should the scanner thread be started? + */ + public final boolean runScanner; + + /** + * Should the listener thread be started? + */ + public final boolean runListener; + + /** + * How long should the scanner wait after start before starting to scan. + */ + public final long scannerDelay; + + /** + * How often should the scanner wait between scans. + */ + public long scannerPeriod; + + /** + * How long should the listener sleep after each poll. + */ + public long listenerSleepInterval; + + /** + * How often should the listener do a live check of the connection. + */ + public long listenerConnectionLiveCheckInterval; + + /** + * How long should the listener sleep after a failure. + */ + public long listenerFailureSleepInterval; + + /** + * Configures a listener with the default parameters, + */ + public DbListenerConfig(String name) { + this(name, true, true, DEFAULT_SCANNER_DELAY, DEFAULT_SCANNER_PERIOD, + DEFAULT_LISTENER_SLEEP_INTERVAL, DEFAULT_LISTENER_CONNECTION_LIVE_CHECK_INTERVAL, DEFAULT_LISTENER_FAILURE_SLEEP_INTERVAL); + } + + public DbListenerConfig(String name, boolean runScanner, boolean runListener, long scannerDelay, long scannerPeriod, + long listenerSleepInterval, long listenerConnectionLiveCheckInterval, long listenerFailureSleepInterval) { + this.name = name; + this.runScanner = runScanner; + this.runListener = runListener; + this.scannerDelay = scannerDelay; + this.scannerPeriod = scannerPeriod; + this.listenerSleepInterval = listenerSleepInterval; + this.listenerConnectionLiveCheckInterval = listenerConnectionLiveCheckInterval; + this.listenerFailureSleepInterval = listenerFailureSleepInterval; + } + + public static long ms(long count, TimeUnit unit) { + return MILLISECONDS.convert(count, unit); + } + } + + public static DbListener nativeDbListener(DbListenerConfig config, NewItemCallback callable, + Supplier<PostgresConnection<org.postgresql.PGConnection>> connectionSupplier) throws SQLException { + return new DbListener(config, callable, connectionSupplier, null); + } + + public static DbListener ngDbListener(DbListenerConfig config, NewItemCallback callable, + Supplier<PostgresConnection<com.impossibl.postgres.api.jdbc.PGConnection>> connectionSupplier) throws SQLException { + return new DbListener(config, callable, null, connectionSupplier); + } + + private DbListener(DbListenerConfig config, NewItemCallback callable, + Supplier<PostgresConnection<org.postgresql.PGConnection>> nativeSup, + Supplier<PostgresConnection<com.impossibl.postgres.api.jdbc.PGConnection>> ngSup) throws SQLException { + this.config = config; + + log.info("DB Listener: {}", config.name); + log.info(" Run scanner: {}", config.runScanner); + log.info(" Run listener: {}", config.runListener); + + if (config.runScanner) { + timer = new Timer("Timer \"" + config.name + "\"", true); + + timer.schedule(robustTimerTask(new NewItemCallbackCallable<>(callable)), config.scannerDelay, config.scannerPeriod); + } else { + timer = null; + } + + if (config.runListener) { + try (PostgresConnection<?> pg = (nativeSup != null ? nativeSup : ngSup).get()) { + Connection c = pg.sqlConnection; + if (c.getMetaData().getDatabaseProductName().toLowerCase().contains("postgresql")) { + if (nativeSup != null) { + listener = new NativeListener(config, nativeSup, callable); + } else { + listener = new NgListener(config, ngSup, callable); + } + Thread thread = new Thread(listener, "LISTEN \"" + config.name + "\""); + thread.start(); + } else { + log.info("Mail listener is configured to run, but the database isn't a PostgreSQL database"); + listener = null; + } + } + } else { + listener = null; + } + } + + public void close() { + log.info("Stopping DB listener {}", config.name); + if (timer != null) { + timer.cancel(); + } + + if (listener != null) { + try { + listener.close(); + } catch (IOException e) { + log.warn("Exception while closing listener.", e); + } + } + } + + private class NewItemCallbackCallable<A> implements Callable<A> { + + private final NewItemCallback callback; + + private NewItemCallbackCallable(NewItemCallback callback) { + this.callback = callback; + } + + @Override + public A call() throws Exception { + callback.newItem(false, Collections.emptyList()); + + return null; + } + } +} diff --git a/src/main/java/io/trygvis/jz14/db/Listener.java b/src/main/java/io/trygvis/jz14/db/Listener.java new file mode 100644 index 0000000..4e8c7c1 --- /dev/null +++ b/src/main/java/io/trygvis/jz14/db/Listener.java @@ -0,0 +1,6 @@ +package io.trygvis.jz14.db; + +import java.io.Closeable; + +public interface Listener extends Runnable, Closeable { +} diff --git a/src/main/java/io/trygvis/jz14/db/NativeConnectionSupplier.java b/src/main/java/io/trygvis/jz14/db/NativeConnectionSupplier.java new file mode 100644 index 0000000..d9db909 --- /dev/null +++ b/src/main/java/io/trygvis/jz14/db/NativeConnectionSupplier.java @@ -0,0 +1,44 @@ +package io.trygvis.jz14.db; + +import io.trygvis.jz14.db.DbListener.PostgresConnection; +import org.postgresql.PGConnection; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.function.Supplier; + +public class NativeConnectionSupplier implements Supplier<PostgresConnection<PGConnection>> { + private final DataSource dataSource; + + public NativeConnectionSupplier(DataSource dataSource) { + this.dataSource = dataSource; + } + + @Override + public PostgresConnection<PGConnection> get() { + try { + Connection sqlConnection = dataSource.getConnection(); + PGConnection pgConnection = unwrap(sqlConnection); + return new PostgresConnection<>(sqlConnection, pgConnection); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public static PGConnection unwrap(Connection c) { + if (c instanceof PGConnection) { + return (PGConnection) c; + } + + /* If you're using Spring, you need to add these two to properly unwrap the underlying PGConnection. + if (c instanceof ConnectionHandle) { + return unwrap(((ConnectionHandle) c).getInternalConnection()); + } + if (c instanceof ConnectionProxy) { + return unwrap(DataSourceUtils.getTargetConnection(c)); + } + */ + throw new RuntimeException("Could not unwrap connection to a PGConnection: " + c.getClass()); + } +} diff --git a/src/main/java/io/trygvis/jz14/db/NativeListener.java b/src/main/java/io/trygvis/jz14/db/NativeListener.java new file mode 100644 index 0000000..c44a7cc --- /dev/null +++ b/src/main/java/io/trygvis/jz14/db/NativeListener.java @@ -0,0 +1,157 @@ +package io.trygvis.jz14.db; + +import io.trygvis.jz14.db.DbListener.PostgresConnection; +import org.postgresql.PGConnection; +import org.postgresql.PGNotification; +import org.slf4j.Logger; + +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; + +import static io.trygvis.jz14.db.DbListener.DbListenerConfig; +import static io.trygvis.jz14.db.DbListener.NewItemCallback; +import static java.lang.System.currentTimeMillis; +import static java.lang.Thread.sleep; +import static org.slf4j.LoggerFactory.getLogger; + +class NativeListener implements Listener { + private final Logger log = getLogger(getClass()); + + private final AtomicBoolean shouldRun = new AtomicBoolean(true); + private final DbListenerConfig config; + private final Supplier<PostgresConnection<PGConnection>> connectionSupplier; + private final NewItemCallback callback; + + private PostgresConnection<PGConnection> c; + private long lastLiveCheck; + + public NativeListener(DbListenerConfig config, Supplier<PostgresConnection<PGConnection>> connectionSupplier, NewItemCallback callback) { + this.config = config; + this.connectionSupplier = connectionSupplier; + this.callback = callback; + } + + public synchronized void close() { + shouldRun.set(false); + notifyAll(); + } + + @Override + public void run() { + while (shouldRun.get()) { + try { + doIt(); + } catch (Throwable e) { + log.error("doIt failed", e); + try { + sleep(config.listenerFailureSleepInterval); + } catch (InterruptedException ex2) { + if (!shouldRun.get()) { + log.error("Got interrupted.", ex2); + } + } + } + } + } + + private void doIt() throws Exception { + + if (c == null) { + log.debug("Connecting to database"); + + try { + c = connectAndListen(); + } catch (Throwable ex) { + onError("Unable to connect to database", ex); + return; + } + } + + long now = currentTimeMillis(); + + if (now - lastLiveCheck > config.listenerConnectionLiveCheckInterval) { + log.debug("Doing live check"); + try { + doLiveCheck(); + lastLiveCheck = now; + } catch (Exception ex) { + onError("Live check failed", ex); + return; + } + } + + PGNotification[] notifications; + try { + notifications = c.underlying.getNotifications(); + } catch (SQLException e) { + onError("Error while checking for notifications on connection.", e); + return; + } + + if (notifications != null) { + List<String> strings = new ArrayList<>(); + for (PGNotification notification : notifications) { + String parameter = notification.getParameter(); + if (parameter == null) { + continue; + } + + parameter = parameter.trim(); + + if (!parameter.isEmpty()) { + strings.add(parameter); + } + } + + log.debug("Got notification: parameters={}", strings); + + try { + callback.newItem(true, strings); + } catch (Exception ex) { + onError("Notification handler failed", ex); + return; + } + } + + sleep(config.listenerSleepInterval); + } + + private void onError(String msg, Throwable ex) throws InterruptedException { + log.info(msg, ex); + + // Do a last attempt at closing the connection, the connection pool might appreciate it. + if (c != null) { + try { + c.close(); + } catch (Throwable e) { + log.info("Exception when closing connection after error", e); + } + } + c = null; + log.info("Sleeping for {} ms after failure", config.listenerFailureSleepInterval); + sleep(config.listenerFailureSleepInterval); + log.info("Resuming work after failure"); + } + + private PostgresConnection<PGConnection> connectAndListen() throws SQLException { + PostgresConnection<PGConnection> c = connectionSupplier.get(); + c.sqlConnection.setAutoCommit(true); + + lastLiveCheck = currentTimeMillis(); + + PreparedStatement s = c.sqlConnection.prepareStatement("LISTEN \"" + config.name + "\""); + s.execute(); + s.close(); + return c; + } + + private void doLiveCheck() throws SQLException { + PreparedStatement s = c.sqlConnection.prepareStatement("SELECT 1"); + s.executeQuery().close(); + s.close(); + } +} diff --git a/src/main/java/io/trygvis/jz14/db/NgConnectionSupplier.java b/src/main/java/io/trygvis/jz14/db/NgConnectionSupplier.java new file mode 100644 index 0000000..4006f1d --- /dev/null +++ b/src/main/java/io/trygvis/jz14/db/NgConnectionSupplier.java @@ -0,0 +1,52 @@ +package io.trygvis.jz14.db; + +import com.impossibl.postgres.api.jdbc.PGConnection; +import io.trygvis.jz14.db.DbListener.PostgresConnection; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.function.Supplier; + +public class NgConnectionSupplier implements Supplier<PostgresConnection<PGConnection>> { + private final DataSource dataSource; + + public NgConnectionSupplier(DataSource dataSource) { + this.dataSource = dataSource; + } + + @Override + public PostgresConnection<PGConnection> get() { + try { + Connection sqlConnection = dataSource.getConnection(); + PGConnection pgConnection = unwrap(sqlConnection); + return new PostgresConnection<>(sqlConnection, pgConnection); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public static PGConnection unwrap(Connection c) { + if (c instanceof PGConnection) { + return (PGConnection) c; + } + + /* If you're using Spring, you need to add these two to properly unwrap the underlying PGConnection. + if (c instanceof ConnectionHandle) { + return unwrap(((ConnectionHandle) c).getInternalConnection()); + } + if (c instanceof ConnectionProxy) { + return unwrap(DataSourceUtils.getTargetConnection(c)); + } + */ + Class<? extends Connection> klass = c.getClass(); + + Class<?>[] interfaces = klass.getInterfaces(); + System.out.println("interfaces.length = " + interfaces.length); + for (Class<?> anInterface : interfaces) { + System.out.println("anInterface = " + anInterface); + } + + throw new RuntimeException("Could not unwrap connection to a PGConnection: " + c.getClass()); + } +} diff --git a/src/main/java/io/trygvis/jz14/db/NgListener.java b/src/main/java/io/trygvis/jz14/db/NgListener.java new file mode 100644 index 0000000..b875ea1 --- /dev/null +++ b/src/main/java/io/trygvis/jz14/db/NgListener.java @@ -0,0 +1,126 @@ +package io.trygvis.jz14.db; + +import com.impossibl.postgres.api.jdbc.PGConnection; +import com.impossibl.postgres.api.jdbc.PGNotificationListener; +import io.trygvis.jz14.db.DbListener.PostgresConnection; +import org.slf4j.Logger; + +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; + +import static io.trygvis.jz14.db.DbListener.DbListenerConfig; +import static io.trygvis.jz14.db.DbListener.NewItemCallback; +import static java.lang.Thread.sleep; +import static org.slf4j.LoggerFactory.getLogger; + +class NgListener implements Listener, PGNotificationListener { + private final Logger log = getLogger(getClass()); + + private final AtomicBoolean shouldRun = new AtomicBoolean(true); + private final DbListenerConfig config; + private final Supplier<PostgresConnection<PGConnection>> connectionSupplier; + private final NewItemCallback callback; + + private PostgresConnection<PGConnection> c; + + public NgListener(DbListenerConfig config, Supplier<PostgresConnection<PGConnection>> connectionSupplier, NewItemCallback callback) { + this.config = config; + this.connectionSupplier = connectionSupplier; + this.callback = callback; + } + + public synchronized void close() { + shouldRun.set(false); + notifyAll(); + } + + @Override + public void run() { + while (shouldRun.get()) { + try { + doIt(); + } catch (Throwable e) { + log.error("doIt failed", e); + try { + sleep(config.listenerFailureSleepInterval); + } catch (InterruptedException ex2) { + if (!shouldRun.get()) { + log.error("Got interrupted.", ex2); + } + } + } + } + } + + private void doIt() throws Exception { + + if (c == null) { + log.debug("Connecting to database"); + + try { + c = connectAndListen(); + } catch (Throwable ex) { + onError("Unable to connect to database", ex); + return; + } + } + + log.debug("Doing live check"); + try { + doLiveCheck(); + } catch (Exception ex) { + onError("Live check failed", ex); + return; + } + + sleep(config.listenerConnectionLiveCheckInterval); + } + + private void onError(String msg, Throwable ex) throws InterruptedException { + log.info(msg, ex); + + // Do a last attempt at closing the connection, the connection pool might appreciate it. + if (c != null) { + try { + c.close(); + } catch (Throwable e) { + log.info("Exception when closing connection after error", e); + } + } + c = null; + log.info("Sleeping for {} ms after failure", config.listenerFailureSleepInterval); + sleep(config.listenerFailureSleepInterval); + log.info("Resuming work after failure"); + } + + private PostgresConnection<PGConnection> connectAndListen() throws SQLException { + PostgresConnection<PGConnection> c = connectionSupplier.get(); + c.sqlConnection.setAutoCommit(true); + + c.underlying.addNotificationListener(this); + + PreparedStatement s = c.sqlConnection.prepareStatement("LISTEN \"" + config.name + "\""); + s.execute(); + s.close(); + + return c; + } + + private void doLiveCheck() throws SQLException { + PreparedStatement s = c.sqlConnection.prepareStatement("SELECT 1"); + s.executeQuery().close(); + s.close(); + } + + @Override + public void notification(int processId, String channelName, String payload) { + try { + callback.newItem(true, Collections.singletonList(payload)); + } catch (Exception e) { + log.warn("Exception while processing callback, payload=" + payload, e); + } + } +} diff --git a/src/main/java/io/trygvis/jz14/db/RobustTimerTask.java b/src/main/java/io/trygvis/jz14/db/RobustTimerTask.java new file mode 100644 index 0000000..af55aa7 --- /dev/null +++ b/src/main/java/io/trygvis/jz14/db/RobustTimerTask.java @@ -0,0 +1,47 @@ +package io.trygvis.jz14.db; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.TimerTask; +import java.util.concurrent.Callable; + +public abstract class RobustTimerTask extends TimerTask { + private static final Logger log = LoggerFactory.getLogger(RobustTimerTask.class); + + private final Class<?> klass; + + private RobustTimerTask(Class<?> klass) { + this.klass = klass; + } + + protected RobustTimerTask() { + this.klass = getClass(); + } + + public static TimerTask robustTimerTask(final TimerTask timerTask) { + return new RobustTimerTask(timerTask.getClass()) { + public void timerRun() throws Exception { + timerTask.run(); + } + }; + } + + public static <T> TimerTask robustTimerTask(final Callable<T> callable) { + return new RobustTimerTask(callable.getClass()) { + public void timerRun() throws Exception { + callable.call(); + } + }; + } + + public void run() { + try { + timerRun(); + } catch (Exception e) { + log.error("Timer task " + klass.getName() + " failed.", e); + } + } + + public abstract void timerRun() throws Exception; +} |