From d6989f1e54104d09b8af6d22cf46ea4f6fc5f4dc Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Wed, 10 Sep 2014 00:12:30 +0200 Subject: o Initial import of postgresql LISTEN/NOTIFY code. --- .gitignore | 3 + LICENSE.txt | 21 ++ init.sql | 37 ++++ pom.xml | 55 +++++ src/main/java/io/trygvis/jz14/db/DbListener.java | 223 +++++++++++++++++++++ src/main/java/io/trygvis/jz14/db/Listener.java | 6 + .../trygvis/jz14/db/NativeConnectionSupplier.java | 44 ++++ .../java/io/trygvis/jz14/db/NativeListener.java | 157 +++++++++++++++ .../io/trygvis/jz14/db/NgConnectionSupplier.java | 52 +++++ src/main/java/io/trygvis/jz14/db/NgListener.java | 126 ++++++++++++ .../java/io/trygvis/jz14/db/RobustTimerTask.java | 47 +++++ src/main/java/io/trygvis/jz14/demo/Db.java | 67 +++++++ .../java/io/trygvis/jz14/demo/InserterMain.java | 37 ++++ .../io/trygvis/jz14/demo/KillListenerMain.java | 58 ++++++ .../java/io/trygvis/jz14/demo/ListenerMain.java | 55 +++++ .../io/trygvis/jz14/demo/NativeListenerMain.java | 8 + .../java/io/trygvis/jz14/demo/NgListenerMain.java | 7 + .../java/io/trygvis/jz14/demo/NotifierMain.java | 34 ++++ src/main/resources/logback.xml | 15 ++ 19 files changed, 1052 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE.txt create mode 100644 init.sql create mode 100644 pom.xml create mode 100644 src/main/java/io/trygvis/jz14/db/DbListener.java create mode 100644 src/main/java/io/trygvis/jz14/db/Listener.java create mode 100644 src/main/java/io/trygvis/jz14/db/NativeConnectionSupplier.java create mode 100644 src/main/java/io/trygvis/jz14/db/NativeListener.java create mode 100644 src/main/java/io/trygvis/jz14/db/NgConnectionSupplier.java create mode 100644 src/main/java/io/trygvis/jz14/db/NgListener.java create mode 100644 src/main/java/io/trygvis/jz14/db/RobustTimerTask.java create mode 100644 src/main/java/io/trygvis/jz14/demo/Db.java create mode 100644 src/main/java/io/trygvis/jz14/demo/InserterMain.java create mode 100644 src/main/java/io/trygvis/jz14/demo/KillListenerMain.java create mode 100644 src/main/java/io/trygvis/jz14/demo/ListenerMain.java create mode 100644 src/main/java/io/trygvis/jz14/demo/NativeListenerMain.java create mode 100644 src/main/java/io/trygvis/jz14/demo/NgListenerMain.java create mode 100644 src/main/java/io/trygvis/jz14/demo/NotifierMain.java create mode 100644 src/main/resources/logback.xml diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f83e8cf --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.idea +target +*.iml diff --git a/LICENSE.txt b/LICENSE.txt new file mode 100644 index 0000000..4dc9c3c --- /dev/null +++ b/LICENSE.txt @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) <2014> Trygve Laugstøl + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/init.sql b/init.sql new file mode 100644 index 0000000..0ec0477 --- /dev/null +++ b/init.sql @@ -0,0 +1,37 @@ +-- Create a table for incoming mail + +CREATE TABLE mail_raw ( + id SERIAL PRIMARY KEY, + raw TEXT +); + +-- Insert a row into the table; + +INSERT INTO mail_raw (raw) VALUES ('my mail'); +NOTIFY mail_raw; + +-- +-- Alternate technique with implicit notifies through triggers +-- + +CREATE TABLE mail_raw_t ( + id SERIAL PRIMARY KEY, + raw TEXT +); + +-- This trigger function does the notification +CREATE FUNCTION notify_trigger() + RETURNS TRIGGER AS $$ +BEGIN + PERFORM pg_notify(TG_TABLE_NAME, NEW.id :: TEXT); + RETURN NULL; +END; +$$ LANGUAGE plpgsql; + +-- Create a trigger on the mail_raw_t table and make it run on each new row +CREATE TRIGGER mail_raw_t_notify +AFTER INSERT ON mail_raw_t +FOR EACH ROW +EXECUTE PROCEDURE notify_trigger(); + +INSERT INTO mail_raw_t (raw) VALUES ('my mail'); diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..c0a5ad5 --- /dev/null +++ b/pom.xml @@ -0,0 +1,55 @@ + + 4.0.0 + + io.trygvis + trygvis-parent + 1 + + io.trygvis.javazone-2014 + javazone-2014 + 1.0-SNAPSHOT + + + 1.8 + 1.8 + UTF-8 + + + + + org.postgresql + postgresql + 9.3-1102-jdbc41 + + + + com.impossibl.pgjdbc-ng + pgjdbc-ng + 0.4 + + + + org.slf4j + slf4j-api + 1.7.7 + + + + ch.qos.logback + logback-classic + 1.1.2 + + + + + + + maven-compiler-plugin + + ${maven.compiler.source} + ${maven.compiler.target} + + + + + diff --git a/src/main/java/io/trygvis/jz14/db/DbListener.java b/src/main/java/io/trygvis/jz14/db/DbListener.java new file mode 100644 index 0000000..f16f391 --- /dev/null +++ b/src/main/java/io/trygvis/jz14/db/DbListener.java @@ -0,0 +1,223 @@ +package io.trygvis.jz14.db; + +import org.slf4j.Logger; + +import java.io.Closeable; +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Collections; +import java.util.Timer; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import static io.trygvis.jz14.db.RobustTimerTask.robustTimerTask; +import static java.util.concurrent.TimeUnit.HOURS; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.slf4j.LoggerFactory.getLogger; + +public class DbListener implements Closeable { + private final Logger log = getLogger(getClass()); + + private final DbListenerConfig config; + private final Timer timer; + + private final Listener listener; + + @FunctionalInterface + public static interface NewItemCallback { + void newItem(boolean wasNotified, Iterable parameters) throws Exception; + } + + public static final class PostgresConnection implements Closeable { + public final Connection sqlConnection; + public final T underlying; + + public PostgresConnection(Connection sqlConnection, T underlying) { + this.sqlConnection = sqlConnection; + this.underlying = underlying; + } + + @Override + public final void close() { + try { + sqlConnection.close(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + } + + public abstract static class DbListenerConfig { + + /** + * Default scanner delay; 1 minute; + */ + public static final long DEFAULT_SCANNER_DELAY = ms(1, MINUTES); + + /** + * Default scanner period; 1 hour. + */ + public static final long DEFAULT_SCANNER_PERIOD = ms(1, HOURS); + + /** + * Default connection life check interval; 1 hour; + */ + public static final long DEFAULT_LISTENER_CONNECTION_LIVE_CHECK_INTERVAL = ms(1, HOURS); + + /** + * Default sleep time after failure; 1 second; + */ + public static final long DEFAULT_LISTENER_SLEEP_INTERVAL = ms(200, MILLISECONDS); + + /** + * Default sleep time after failure; 1 minute; + */ + public static final long DEFAULT_LISTENER_FAILURE_SLEEP_INTERVAL = ms(1, MINUTES); + + /** + * The name of the channel to listen on. + */ + public final String name; + + /** + * Should the scanner thread be started? + */ + public final boolean runScanner; + + /** + * Should the listener thread be started? + */ + public final boolean runListener; + + /** + * How long should the scanner wait after start before starting to scan. + */ + public final long scannerDelay; + + /** + * How often should the scanner wait between scans. + */ + public long scannerPeriod; + + /** + * How long should the listener sleep after each poll. + */ + public long listenerSleepInterval; + + /** + * How often should the listener do a live check of the connection. + */ + public long listenerConnectionLiveCheckInterval; + + /** + * How long should the listener sleep after a failure. + */ + public long listenerFailureSleepInterval; + + /** + * Configures a listener with the default parameters, + */ + public DbListenerConfig(String name) { + this(name, true, true, DEFAULT_SCANNER_DELAY, DEFAULT_SCANNER_PERIOD, + DEFAULT_LISTENER_SLEEP_INTERVAL, DEFAULT_LISTENER_CONNECTION_LIVE_CHECK_INTERVAL, DEFAULT_LISTENER_FAILURE_SLEEP_INTERVAL); + } + + public DbListenerConfig(String name, boolean runScanner, boolean runListener, long scannerDelay, long scannerPeriod, + long listenerSleepInterval, long listenerConnectionLiveCheckInterval, long listenerFailureSleepInterval) { + this.name = name; + this.runScanner = runScanner; + this.runListener = runListener; + this.scannerDelay = scannerDelay; + this.scannerPeriod = scannerPeriod; + this.listenerSleepInterval = listenerSleepInterval; + this.listenerConnectionLiveCheckInterval = listenerConnectionLiveCheckInterval; + this.listenerFailureSleepInterval = listenerFailureSleepInterval; + } + + public static long ms(long count, TimeUnit unit) { + return MILLISECONDS.convert(count, unit); + } + } + + public static DbListener nativeDbListener(DbListenerConfig config, NewItemCallback callable, + Supplier> connectionSupplier) throws SQLException { + return new DbListener(config, callable, connectionSupplier, null); + } + + public static DbListener ngDbListener(DbListenerConfig config, NewItemCallback callable, + Supplier> connectionSupplier) throws SQLException { + return new DbListener(config, callable, null, connectionSupplier); + } + + private DbListener(DbListenerConfig config, NewItemCallback callable, + Supplier> nativeSup, + Supplier> ngSup) throws SQLException { + this.config = config; + + log.info("DB Listener: {}", config.name); + log.info(" Run scanner: {}", config.runScanner); + log.info(" Run listener: {}", config.runListener); + + if (config.runScanner) { + timer = new Timer("Timer \"" + config.name + "\"", true); + + timer.schedule(robustTimerTask(new NewItemCallbackCallable<>(callable)), config.scannerDelay, config.scannerPeriod); + } else { + timer = null; + } + + if (config.runListener) { + try (PostgresConnection pg = (nativeSup != null ? nativeSup : ngSup).get()) { + Connection c = pg.sqlConnection; + if (c.getMetaData().getDatabaseProductName().toLowerCase().contains("postgresql")) { + if (nativeSup != null) { + listener = new NativeListener(config, nativeSup, callable); + } else { + listener = new NgListener(config, ngSup, callable); + } + Thread thread = new Thread(listener, "LISTEN \"" + config.name + "\""); + thread.start(); + } else { + log.info("Mail listener is configured to run, but the database isn't a PostgreSQL database"); + listener = null; + } + } + } else { + listener = null; + } + } + + public void close() { + log.info("Stopping DB listener {}", config.name); + if (timer != null) { + timer.cancel(); + } + + if (listener != null) { + try { + listener.close(); + } catch (IOException e) { + log.warn("Exception while closing listener.", e); + } + } + } + + private class NewItemCallbackCallable implements Callable { + + private final NewItemCallback callback; + + private NewItemCallbackCallable(NewItemCallback callback) { + this.callback = callback; + } + + @Override + public A call() throws Exception { + callback.newItem(false, Collections.emptyList()); + + return null; + } + } +} diff --git a/src/main/java/io/trygvis/jz14/db/Listener.java b/src/main/java/io/trygvis/jz14/db/Listener.java new file mode 100644 index 0000000..4e8c7c1 --- /dev/null +++ b/src/main/java/io/trygvis/jz14/db/Listener.java @@ -0,0 +1,6 @@ +package io.trygvis.jz14.db; + +import java.io.Closeable; + +public interface Listener extends Runnable, Closeable { +} diff --git a/src/main/java/io/trygvis/jz14/db/NativeConnectionSupplier.java b/src/main/java/io/trygvis/jz14/db/NativeConnectionSupplier.java new file mode 100644 index 0000000..d9db909 --- /dev/null +++ b/src/main/java/io/trygvis/jz14/db/NativeConnectionSupplier.java @@ -0,0 +1,44 @@ +package io.trygvis.jz14.db; + +import io.trygvis.jz14.db.DbListener.PostgresConnection; +import org.postgresql.PGConnection; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.function.Supplier; + +public class NativeConnectionSupplier implements Supplier> { + private final DataSource dataSource; + + public NativeConnectionSupplier(DataSource dataSource) { + this.dataSource = dataSource; + } + + @Override + public PostgresConnection get() { + try { + Connection sqlConnection = dataSource.getConnection(); + PGConnection pgConnection = unwrap(sqlConnection); + return new PostgresConnection<>(sqlConnection, pgConnection); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public static PGConnection unwrap(Connection c) { + if (c instanceof PGConnection) { + return (PGConnection) c; + } + + /* If you're using Spring, you need to add these two to properly unwrap the underlying PGConnection. + if (c instanceof ConnectionHandle) { + return unwrap(((ConnectionHandle) c).getInternalConnection()); + } + if (c instanceof ConnectionProxy) { + return unwrap(DataSourceUtils.getTargetConnection(c)); + } + */ + throw new RuntimeException("Could not unwrap connection to a PGConnection: " + c.getClass()); + } +} diff --git a/src/main/java/io/trygvis/jz14/db/NativeListener.java b/src/main/java/io/trygvis/jz14/db/NativeListener.java new file mode 100644 index 0000000..c44a7cc --- /dev/null +++ b/src/main/java/io/trygvis/jz14/db/NativeListener.java @@ -0,0 +1,157 @@ +package io.trygvis.jz14.db; + +import io.trygvis.jz14.db.DbListener.PostgresConnection; +import org.postgresql.PGConnection; +import org.postgresql.PGNotification; +import org.slf4j.Logger; + +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; + +import static io.trygvis.jz14.db.DbListener.DbListenerConfig; +import static io.trygvis.jz14.db.DbListener.NewItemCallback; +import static java.lang.System.currentTimeMillis; +import static java.lang.Thread.sleep; +import static org.slf4j.LoggerFactory.getLogger; + +class NativeListener implements Listener { + private final Logger log = getLogger(getClass()); + + private final AtomicBoolean shouldRun = new AtomicBoolean(true); + private final DbListenerConfig config; + private final Supplier> connectionSupplier; + private final NewItemCallback callback; + + private PostgresConnection c; + private long lastLiveCheck; + + public NativeListener(DbListenerConfig config, Supplier> connectionSupplier, NewItemCallback callback) { + this.config = config; + this.connectionSupplier = connectionSupplier; + this.callback = callback; + } + + public synchronized void close() { + shouldRun.set(false); + notifyAll(); + } + + @Override + public void run() { + while (shouldRun.get()) { + try { + doIt(); + } catch (Throwable e) { + log.error("doIt failed", e); + try { + sleep(config.listenerFailureSleepInterval); + } catch (InterruptedException ex2) { + if (!shouldRun.get()) { + log.error("Got interrupted.", ex2); + } + } + } + } + } + + private void doIt() throws Exception { + + if (c == null) { + log.debug("Connecting to database"); + + try { + c = connectAndListen(); + } catch (Throwable ex) { + onError("Unable to connect to database", ex); + return; + } + } + + long now = currentTimeMillis(); + + if (now - lastLiveCheck > config.listenerConnectionLiveCheckInterval) { + log.debug("Doing live check"); + try { + doLiveCheck(); + lastLiveCheck = now; + } catch (Exception ex) { + onError("Live check failed", ex); + return; + } + } + + PGNotification[] notifications; + try { + notifications = c.underlying.getNotifications(); + } catch (SQLException e) { + onError("Error while checking for notifications on connection.", e); + return; + } + + if (notifications != null) { + List strings = new ArrayList<>(); + for (PGNotification notification : notifications) { + String parameter = notification.getParameter(); + if (parameter == null) { + continue; + } + + parameter = parameter.trim(); + + if (!parameter.isEmpty()) { + strings.add(parameter); + } + } + + log.debug("Got notification: parameters={}", strings); + + try { + callback.newItem(true, strings); + } catch (Exception ex) { + onError("Notification handler failed", ex); + return; + } + } + + sleep(config.listenerSleepInterval); + } + + private void onError(String msg, Throwable ex) throws InterruptedException { + log.info(msg, ex); + + // Do a last attempt at closing the connection, the connection pool might appreciate it. + if (c != null) { + try { + c.close(); + } catch (Throwable e) { + log.info("Exception when closing connection after error", e); + } + } + c = null; + log.info("Sleeping for {} ms after failure", config.listenerFailureSleepInterval); + sleep(config.listenerFailureSleepInterval); + log.info("Resuming work after failure"); + } + + private PostgresConnection connectAndListen() throws SQLException { + PostgresConnection c = connectionSupplier.get(); + c.sqlConnection.setAutoCommit(true); + + lastLiveCheck = currentTimeMillis(); + + PreparedStatement s = c.sqlConnection.prepareStatement("LISTEN \"" + config.name + "\""); + s.execute(); + s.close(); + return c; + } + + private void doLiveCheck() throws SQLException { + PreparedStatement s = c.sqlConnection.prepareStatement("SELECT 1"); + s.executeQuery().close(); + s.close(); + } +} diff --git a/src/main/java/io/trygvis/jz14/db/NgConnectionSupplier.java b/src/main/java/io/trygvis/jz14/db/NgConnectionSupplier.java new file mode 100644 index 0000000..4006f1d --- /dev/null +++ b/src/main/java/io/trygvis/jz14/db/NgConnectionSupplier.java @@ -0,0 +1,52 @@ +package io.trygvis.jz14.db; + +import com.impossibl.postgres.api.jdbc.PGConnection; +import io.trygvis.jz14.db.DbListener.PostgresConnection; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.function.Supplier; + +public class NgConnectionSupplier implements Supplier> { + private final DataSource dataSource; + + public NgConnectionSupplier(DataSource dataSource) { + this.dataSource = dataSource; + } + + @Override + public PostgresConnection get() { + try { + Connection sqlConnection = dataSource.getConnection(); + PGConnection pgConnection = unwrap(sqlConnection); + return new PostgresConnection<>(sqlConnection, pgConnection); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public static PGConnection unwrap(Connection c) { + if (c instanceof PGConnection) { + return (PGConnection) c; + } + + /* If you're using Spring, you need to add these two to properly unwrap the underlying PGConnection. + if (c instanceof ConnectionHandle) { + return unwrap(((ConnectionHandle) c).getInternalConnection()); + } + if (c instanceof ConnectionProxy) { + return unwrap(DataSourceUtils.getTargetConnection(c)); + } + */ + Class klass = c.getClass(); + + Class[] interfaces = klass.getInterfaces(); + System.out.println("interfaces.length = " + interfaces.length); + for (Class anInterface : interfaces) { + System.out.println("anInterface = " + anInterface); + } + + throw new RuntimeException("Could not unwrap connection to a PGConnection: " + c.getClass()); + } +} diff --git a/src/main/java/io/trygvis/jz14/db/NgListener.java b/src/main/java/io/trygvis/jz14/db/NgListener.java new file mode 100644 index 0000000..b875ea1 --- /dev/null +++ b/src/main/java/io/trygvis/jz14/db/NgListener.java @@ -0,0 +1,126 @@ +package io.trygvis.jz14.db; + +import com.impossibl.postgres.api.jdbc.PGConnection; +import com.impossibl.postgres.api.jdbc.PGNotificationListener; +import io.trygvis.jz14.db.DbListener.PostgresConnection; +import org.slf4j.Logger; + +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; + +import static io.trygvis.jz14.db.DbListener.DbListenerConfig; +import static io.trygvis.jz14.db.DbListener.NewItemCallback; +import static java.lang.Thread.sleep; +import static org.slf4j.LoggerFactory.getLogger; + +class NgListener implements Listener, PGNotificationListener { + private final Logger log = getLogger(getClass()); + + private final AtomicBoolean shouldRun = new AtomicBoolean(true); + private final DbListenerConfig config; + private final Supplier> connectionSupplier; + private final NewItemCallback callback; + + private PostgresConnection c; + + public NgListener(DbListenerConfig config, Supplier> connectionSupplier, NewItemCallback callback) { + this.config = config; + this.connectionSupplier = connectionSupplier; + this.callback = callback; + } + + public synchronized void close() { + shouldRun.set(false); + notifyAll(); + } + + @Override + public void run() { + while (shouldRun.get()) { + try { + doIt(); + } catch (Throwable e) { + log.error("doIt failed", e); + try { + sleep(config.listenerFailureSleepInterval); + } catch (InterruptedException ex2) { + if (!shouldRun.get()) { + log.error("Got interrupted.", ex2); + } + } + } + } + } + + private void doIt() throws Exception { + + if (c == null) { + log.debug("Connecting to database"); + + try { + c = connectAndListen(); + } catch (Throwable ex) { + onError("Unable to connect to database", ex); + return; + } + } + + log.debug("Doing live check"); + try { + doLiveCheck(); + } catch (Exception ex) { + onError("Live check failed", ex); + return; + } + + sleep(config.listenerConnectionLiveCheckInterval); + } + + private void onError(String msg, Throwable ex) throws InterruptedException { + log.info(msg, ex); + + // Do a last attempt at closing the connection, the connection pool might appreciate it. + if (c != null) { + try { + c.close(); + } catch (Throwable e) { + log.info("Exception when closing connection after error", e); + } + } + c = null; + log.info("Sleeping for {} ms after failure", config.listenerFailureSleepInterval); + sleep(config.listenerFailureSleepInterval); + log.info("Resuming work after failure"); + } + + private PostgresConnection connectAndListen() throws SQLException { + PostgresConnection c = connectionSupplier.get(); + c.sqlConnection.setAutoCommit(true); + + c.underlying.addNotificationListener(this); + + PreparedStatement s = c.sqlConnection.prepareStatement("LISTEN \"" + config.name + "\""); + s.execute(); + s.close(); + + return c; + } + + private void doLiveCheck() throws SQLException { + PreparedStatement s = c.sqlConnection.prepareStatement("SELECT 1"); + s.executeQuery().close(); + s.close(); + } + + @Override + public void notification(int processId, String channelName, String payload) { + try { + callback.newItem(true, Collections.singletonList(payload)); + } catch (Exception e) { + log.warn("Exception while processing callback, payload=" + payload, e); + } + } +} diff --git a/src/main/java/io/trygvis/jz14/db/RobustTimerTask.java b/src/main/java/io/trygvis/jz14/db/RobustTimerTask.java new file mode 100644 index 0000000..af55aa7 --- /dev/null +++ b/src/main/java/io/trygvis/jz14/db/RobustTimerTask.java @@ -0,0 +1,47 @@ +package io.trygvis.jz14.db; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.TimerTask; +import java.util.concurrent.Callable; + +public abstract class RobustTimerTask extends TimerTask { + private static final Logger log = LoggerFactory.getLogger(RobustTimerTask.class); + + private final Class klass; + + private RobustTimerTask(Class klass) { + this.klass = klass; + } + + protected RobustTimerTask() { + this.klass = getClass(); + } + + public static TimerTask robustTimerTask(final TimerTask timerTask) { + return new RobustTimerTask(timerTask.getClass()) { + public void timerRun() throws Exception { + timerTask.run(); + } + }; + } + + public static TimerTask robustTimerTask(final Callable callable) { + return new RobustTimerTask(callable.getClass()) { + public void timerRun() throws Exception { + callable.call(); + } + }; + } + + public void run() { + try { + timerRun(); + } catch (Exception e) { + log.error("Timer task " + klass.getName() + " failed.", e); + } + } + + public abstract void timerRun() throws Exception; +} diff --git a/src/main/java/io/trygvis/jz14/demo/Db.java b/src/main/java/io/trygvis/jz14/demo/Db.java new file mode 100644 index 0000000..9de8d7d --- /dev/null +++ b/src/main/java/io/trygvis/jz14/demo/Db.java @@ -0,0 +1,67 @@ +package io.trygvis.jz14.demo; + +import com.impossibl.postgres.jdbc.PGDataSource; +import org.postgresql.ds.PGPoolingDataSource; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +import static java.util.Optional.ofNullable; + +public class Db { + + public final String applicationName; + public final boolean ng; + + public Db(String applicationName) { + this.applicationName = applicationName; + ng = false; + } + + public Db(String applicationName, boolean ng) { + this.applicationName = applicationName; + this.ng = ng; + } + + public Connection getConnection() throws SQLException { + return DriverManager.getConnection(jdbcUrl(), jdbcUsername(), jdbcPassword()); + } + + private String jdbcPassword() { + return ofNullable(System.getenv("password")).orElseGet(() -> System.getProperty("user.name")); + } + + private String jdbcUsername() { + return ofNullable(System.getenv("username")).orElseGet(() -> System.getProperty("user.name")); + } + + private String jdbcDatabase() { + return ofNullable(System.getenv("database")).orElseGet(() -> System.getProperty("user.name")); + } + + private String jdbcUrl() { + if (ng) { + return "jdbc:pgsql://localhost/" + System.getProperty("user.name") + "?ApplicationName=" + applicationName; + } + return "jdbc:postgresql://localhost/" + System.getProperty("user.name") + "?ApplicationName=" + applicationName; + } + + public DataSource dataSource() throws SQLException { + if (!ng) { + PGPoolingDataSource ds = new PGPoolingDataSource(); + ds.setUrl(jdbcUrl()); + ds.setUser(jdbcUsername()); + ds.setPassword(jdbcPassword()); + return ds; + } else { + PGDataSource ds = new PGDataSource(); + ds.setHost("localhost"); + ds.setUser(jdbcUsername()); + ds.setPassword(jdbcPassword()); + ds.setDatabase(jdbcDatabase()); + return ds; + } + } +} diff --git a/src/main/java/io/trygvis/jz14/demo/InserterMain.java b/src/main/java/io/trygvis/jz14/demo/InserterMain.java new file mode 100644 index 0000000..1192a09 --- /dev/null +++ b/src/main/java/io/trygvis/jz14/demo/InserterMain.java @@ -0,0 +1,37 @@ +package io.trygvis.jz14.demo; + +import org.slf4j.Logger; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.util.Random; + +import static org.slf4j.LoggerFactory.getLogger; + +public class InserterMain { + Logger log = getLogger(getClass()); + Db db = new Db("inserter"); + Random r = new Random(); + + public static void main(String[] args) throws Exception { + new InserterMain().main(); + } + + public void main() throws Exception { + Connection c = db.getConnection(); + c.setAutoCommit(false); + + PreparedStatement stmt = c.prepareStatement("INSERT INTO mail_raw_t(raw) VALUES(?)"); + int count = 1 + r.nextInt(9); + + for (int i = 0; i < count; i++) { + stmt.setString(1, "mail #" + i); + stmt.execute(); + } + c.commit(); + + log.info("INSERT performed, count={}", count); + + c.close(); + } +} diff --git a/src/main/java/io/trygvis/jz14/demo/KillListenerMain.java b/src/main/java/io/trygvis/jz14/demo/KillListenerMain.java new file mode 100644 index 0000000..6ea982a --- /dev/null +++ b/src/main/java/io/trygvis/jz14/demo/KillListenerMain.java @@ -0,0 +1,58 @@ +package io.trygvis.jz14.demo; + +import io.trygvis.jz14.db.DbListener; +import org.slf4j.Logger; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; + +import static org.slf4j.LoggerFactory.getLogger; + +/** + * Kills all connections from {@link io.trygvis.jz14.demo.ListenerMain}. + *

+ * Note that this is slightly different from a postgresql backend failing, as the client will be told that it was + * killed, but the effect in this code is the same. + */ +public class KillListenerMain implements DbListener.NewItemCallback { + Logger log = getLogger(getClass()); + Db db = new Db("killer"); + + public static void main(String[] args) throws Exception { + new KillListenerMain().main(); + } + + private void main() throws Exception { + Connection c = db.getConnection(); + Statement s = c.createStatement(); + + ResultSet rs = s.executeQuery("SELECT pid FROM pg_stat_activity WHERE usename=user AND application_name='listener'"); + if (!rs.next()) { + System.out.println("Couldn't find any listener"); + } else { + List pids = new ArrayList<>(); + + do { + int pid = rs.getInt("pid"); + pids.add(pid); + } while (rs.next()); + + for (Integer pid : pids) { + System.out.println("Killing " + pid); + ResultSet rs2 = s.executeQuery("select pg_terminate_backend(" + pid + ")"); + rs2.next(); + System.out.println("rs2.getBoolean(1) = " + rs2.getBoolean(1)); + } + } + + c.close(); + } + + @Override + public void newItem(boolean wasNotified, Iterable parameters) throws Exception { + log.info("ListenerMain.newItem: wasNotified={}, parameters={}", wasNotified, parameters); + } +} diff --git a/src/main/java/io/trygvis/jz14/demo/ListenerMain.java b/src/main/java/io/trygvis/jz14/demo/ListenerMain.java new file mode 100644 index 0000000..82f7c27 --- /dev/null +++ b/src/main/java/io/trygvis/jz14/demo/ListenerMain.java @@ -0,0 +1,55 @@ +package io.trygvis.jz14.demo; + +import io.trygvis.jz14.db.DbListener; +import io.trygvis.jz14.db.DbListener.DbListenerConfig; +import io.trygvis.jz14.db.NativeConnectionSupplier; +import io.trygvis.jz14.db.NgConnectionSupplier; +import org.slf4j.Logger; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.slf4j.LoggerFactory.getLogger; + +public class ListenerMain implements DbListener.NewItemCallback { + Logger log = getLogger(getClass()); + + public void main(boolean useNative) throws Exception { + DbListener listener; + if (useNative) { + Db db = new Db("listener"); + listener = DbListener.nativeDbListener( + new DbListenerConfigForListenerMain("mail_raw"), + this, new NativeConnectionSupplier(db.dataSource()) + ); + } else { + Db db = new Db("listener", true); + listener = DbListener.ngDbListener( + new DbListenerConfigForListenerMain("mail_raw"), + this, new NgConnectionSupplier(db.dataSource()) + ); + } + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + listener.close(); + } + }); + + while (true) { + Thread.sleep(Long.MAX_VALUE); + } + } + + @Override + public void newItem(boolean wasNotified, Iterable parameters) throws Exception { + log.info("ListenerMain.newItem: wasNotified={}, parameters={}", wasNotified, parameters); + } + + private class DbListenerConfigForListenerMain extends DbListenerConfig { + public DbListenerConfigForListenerMain(String name) { + super(name); + listenerFailureSleepInterval = ms(5, SECONDS); + listenerConnectionLiveCheckInterval = ms(5, SECONDS); + } + } +} diff --git a/src/main/java/io/trygvis/jz14/demo/NativeListenerMain.java b/src/main/java/io/trygvis/jz14/demo/NativeListenerMain.java new file mode 100644 index 0000000..03f1c51 --- /dev/null +++ b/src/main/java/io/trygvis/jz14/demo/NativeListenerMain.java @@ -0,0 +1,8 @@ +package io.trygvis.jz14.demo; + +public class NativeListenerMain { + public static void main(String[] args) throws Exception { + new ListenerMain().main(true); + } +} + diff --git a/src/main/java/io/trygvis/jz14/demo/NgListenerMain.java b/src/main/java/io/trygvis/jz14/demo/NgListenerMain.java new file mode 100644 index 0000000..0ea79df --- /dev/null +++ b/src/main/java/io/trygvis/jz14/demo/NgListenerMain.java @@ -0,0 +1,7 @@ +package io.trygvis.jz14.demo; + +public class NgListenerMain { + public static void main(String[] args) throws Exception { + new ListenerMain().main(false); + } +} diff --git a/src/main/java/io/trygvis/jz14/demo/NotifierMain.java b/src/main/java/io/trygvis/jz14/demo/NotifierMain.java new file mode 100644 index 0000000..12dad41 --- /dev/null +++ b/src/main/java/io/trygvis/jz14/demo/NotifierMain.java @@ -0,0 +1,34 @@ +package io.trygvis.jz14.demo; + +import org.slf4j.Logger; + +import java.sql.Connection; +import java.util.Random; + +import static org.slf4j.LoggerFactory.getLogger; + +public class NotifierMain { + Logger log = getLogger(getClass()); + Db db = new Db("notifier"); + Random r = new Random(); + + public static void main(String[] args) throws Exception { + new NotifierMain().main(); + } + + public void main() throws Exception { + Connection c = db.getConnection(); +// c.setAutoCommit(false); + + int count = 1 + r.nextInt(9); + + for (int i = 0; i < count; i++) { + c.createStatement().execute("NOTIFY mail_raw, '" + i + "';"); + } +// c.commit(); + + log.info("NOTIFY performed, count={}", count); + + c.close(); + } +} diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml new file mode 100644 index 0000000..31e60d0 --- /dev/null +++ b/src/main/resources/logback.xml @@ -0,0 +1,15 @@ + + + + + %d{HH:mm:ss.SSS} %-5level [%-17thread] %-20logger{0} - %msg%n + + + + + + + + + + -- cgit v1.2.3