From d6989f1e54104d09b8af6d22cf46ea4f6fc5f4dc Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Wed, 10 Sep 2014 00:12:30 +0200 Subject: o Initial import of postgresql LISTEN/NOTIFY code. --- src/main/java/io/trygvis/jz14/demo/Db.java | 67 ++++++++++++++++++++++ .../java/io/trygvis/jz14/demo/InserterMain.java | 37 ++++++++++++ .../io/trygvis/jz14/demo/KillListenerMain.java | 58 +++++++++++++++++++ .../java/io/trygvis/jz14/demo/ListenerMain.java | 55 ++++++++++++++++++ .../io/trygvis/jz14/demo/NativeListenerMain.java | 8 +++ .../java/io/trygvis/jz14/demo/NgListenerMain.java | 7 +++ .../java/io/trygvis/jz14/demo/NotifierMain.java | 34 +++++++++++ 7 files changed, 266 insertions(+) create mode 100644 src/main/java/io/trygvis/jz14/demo/Db.java create mode 100644 src/main/java/io/trygvis/jz14/demo/InserterMain.java create mode 100644 src/main/java/io/trygvis/jz14/demo/KillListenerMain.java create mode 100644 src/main/java/io/trygvis/jz14/demo/ListenerMain.java create mode 100644 src/main/java/io/trygvis/jz14/demo/NativeListenerMain.java create mode 100644 src/main/java/io/trygvis/jz14/demo/NgListenerMain.java create mode 100644 src/main/java/io/trygvis/jz14/demo/NotifierMain.java (limited to 'src/main/java/io/trygvis/jz14/demo') diff --git a/src/main/java/io/trygvis/jz14/demo/Db.java b/src/main/java/io/trygvis/jz14/demo/Db.java new file mode 100644 index 0000000..9de8d7d --- /dev/null +++ b/src/main/java/io/trygvis/jz14/demo/Db.java @@ -0,0 +1,67 @@ +package io.trygvis.jz14.demo; + +import com.impossibl.postgres.jdbc.PGDataSource; +import org.postgresql.ds.PGPoolingDataSource; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +import static java.util.Optional.ofNullable; + +public class Db { + + public final String applicationName; + public final boolean ng; + + public Db(String applicationName) { + this.applicationName = applicationName; + ng = false; + } + + public Db(String applicationName, boolean ng) { + this.applicationName = applicationName; + this.ng = ng; + } + + public Connection getConnection() throws SQLException { + return DriverManager.getConnection(jdbcUrl(), jdbcUsername(), jdbcPassword()); + } + + private String jdbcPassword() { + return ofNullable(System.getenv("password")).orElseGet(() -> System.getProperty("user.name")); + } + + private String jdbcUsername() { + return ofNullable(System.getenv("username")).orElseGet(() -> System.getProperty("user.name")); + } + + private String jdbcDatabase() { + return ofNullable(System.getenv("database")).orElseGet(() -> System.getProperty("user.name")); + } + + private String jdbcUrl() { + if (ng) { + return "jdbc:pgsql://localhost/" + System.getProperty("user.name") + "?ApplicationName=" + applicationName; + } + return "jdbc:postgresql://localhost/" + System.getProperty("user.name") + "?ApplicationName=" + applicationName; + } + + public DataSource dataSource() throws SQLException { + if (!ng) { + PGPoolingDataSource ds = new PGPoolingDataSource(); + ds.setUrl(jdbcUrl()); + ds.setUser(jdbcUsername()); + ds.setPassword(jdbcPassword()); + return ds; + } else { + PGDataSource ds = new PGDataSource(); + ds.setHost("localhost"); + ds.setUser(jdbcUsername()); + ds.setPassword(jdbcPassword()); + ds.setDatabase(jdbcDatabase()); + return ds; + } + } +} diff --git a/src/main/java/io/trygvis/jz14/demo/InserterMain.java b/src/main/java/io/trygvis/jz14/demo/InserterMain.java new file mode 100644 index 0000000..1192a09 --- /dev/null +++ b/src/main/java/io/trygvis/jz14/demo/InserterMain.java @@ -0,0 +1,37 @@ +package io.trygvis.jz14.demo; + +import org.slf4j.Logger; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.util.Random; + +import static org.slf4j.LoggerFactory.getLogger; + +public class InserterMain { + Logger log = getLogger(getClass()); + Db db = new Db("inserter"); + Random r = new Random(); + + public static void main(String[] args) throws Exception { + new InserterMain().main(); + } + + public void main() throws Exception { + Connection c = db.getConnection(); + c.setAutoCommit(false); + + PreparedStatement stmt = c.prepareStatement("INSERT INTO mail_raw_t(raw) VALUES(?)"); + int count = 1 + r.nextInt(9); + + for (int i = 0; i < count; i++) { + stmt.setString(1, "mail #" + i); + stmt.execute(); + } + c.commit(); + + log.info("INSERT performed, count={}", count); + + c.close(); + } +} diff --git a/src/main/java/io/trygvis/jz14/demo/KillListenerMain.java b/src/main/java/io/trygvis/jz14/demo/KillListenerMain.java new file mode 100644 index 0000000..6ea982a --- /dev/null +++ b/src/main/java/io/trygvis/jz14/demo/KillListenerMain.java @@ -0,0 +1,58 @@ +package io.trygvis.jz14.demo; + +import io.trygvis.jz14.db.DbListener; +import org.slf4j.Logger; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; + +import static org.slf4j.LoggerFactory.getLogger; + +/** + * Kills all connections from {@link io.trygvis.jz14.demo.ListenerMain}. + *

+ * Note that this is slightly different from a postgresql backend failing, as the client will be told that it was + * killed, but the effect in this code is the same. + */ +public class KillListenerMain implements DbListener.NewItemCallback { + Logger log = getLogger(getClass()); + Db db = new Db("killer"); + + public static void main(String[] args) throws Exception { + new KillListenerMain().main(); + } + + private void main() throws Exception { + Connection c = db.getConnection(); + Statement s = c.createStatement(); + + ResultSet rs = s.executeQuery("SELECT pid FROM pg_stat_activity WHERE usename=user AND application_name='listener'"); + if (!rs.next()) { + System.out.println("Couldn't find any listener"); + } else { + List pids = new ArrayList<>(); + + do { + int pid = rs.getInt("pid"); + pids.add(pid); + } while (rs.next()); + + for (Integer pid : pids) { + System.out.println("Killing " + pid); + ResultSet rs2 = s.executeQuery("select pg_terminate_backend(" + pid + ")"); + rs2.next(); + System.out.println("rs2.getBoolean(1) = " + rs2.getBoolean(1)); + } + } + + c.close(); + } + + @Override + public void newItem(boolean wasNotified, Iterable parameters) throws Exception { + log.info("ListenerMain.newItem: wasNotified={}, parameters={}", wasNotified, parameters); + } +} diff --git a/src/main/java/io/trygvis/jz14/demo/ListenerMain.java b/src/main/java/io/trygvis/jz14/demo/ListenerMain.java new file mode 100644 index 0000000..82f7c27 --- /dev/null +++ b/src/main/java/io/trygvis/jz14/demo/ListenerMain.java @@ -0,0 +1,55 @@ +package io.trygvis.jz14.demo; + +import io.trygvis.jz14.db.DbListener; +import io.trygvis.jz14.db.DbListener.DbListenerConfig; +import io.trygvis.jz14.db.NativeConnectionSupplier; +import io.trygvis.jz14.db.NgConnectionSupplier; +import org.slf4j.Logger; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.slf4j.LoggerFactory.getLogger; + +public class ListenerMain implements DbListener.NewItemCallback { + Logger log = getLogger(getClass()); + + public void main(boolean useNative) throws Exception { + DbListener listener; + if (useNative) { + Db db = new Db("listener"); + listener = DbListener.nativeDbListener( + new DbListenerConfigForListenerMain("mail_raw"), + this, new NativeConnectionSupplier(db.dataSource()) + ); + } else { + Db db = new Db("listener", true); + listener = DbListener.ngDbListener( + new DbListenerConfigForListenerMain("mail_raw"), + this, new NgConnectionSupplier(db.dataSource()) + ); + } + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + listener.close(); + } + }); + + while (true) { + Thread.sleep(Long.MAX_VALUE); + } + } + + @Override + public void newItem(boolean wasNotified, Iterable parameters) throws Exception { + log.info("ListenerMain.newItem: wasNotified={}, parameters={}", wasNotified, parameters); + } + + private class DbListenerConfigForListenerMain extends DbListenerConfig { + public DbListenerConfigForListenerMain(String name) { + super(name); + listenerFailureSleepInterval = ms(5, SECONDS); + listenerConnectionLiveCheckInterval = ms(5, SECONDS); + } + } +} diff --git a/src/main/java/io/trygvis/jz14/demo/NativeListenerMain.java b/src/main/java/io/trygvis/jz14/demo/NativeListenerMain.java new file mode 100644 index 0000000..03f1c51 --- /dev/null +++ b/src/main/java/io/trygvis/jz14/demo/NativeListenerMain.java @@ -0,0 +1,8 @@ +package io.trygvis.jz14.demo; + +public class NativeListenerMain { + public static void main(String[] args) throws Exception { + new ListenerMain().main(true); + } +} + diff --git a/src/main/java/io/trygvis/jz14/demo/NgListenerMain.java b/src/main/java/io/trygvis/jz14/demo/NgListenerMain.java new file mode 100644 index 0000000..0ea79df --- /dev/null +++ b/src/main/java/io/trygvis/jz14/demo/NgListenerMain.java @@ -0,0 +1,7 @@ +package io.trygvis.jz14.demo; + +public class NgListenerMain { + public static void main(String[] args) throws Exception { + new ListenerMain().main(false); + } +} diff --git a/src/main/java/io/trygvis/jz14/demo/NotifierMain.java b/src/main/java/io/trygvis/jz14/demo/NotifierMain.java new file mode 100644 index 0000000..12dad41 --- /dev/null +++ b/src/main/java/io/trygvis/jz14/demo/NotifierMain.java @@ -0,0 +1,34 @@ +package io.trygvis.jz14.demo; + +import org.slf4j.Logger; + +import java.sql.Connection; +import java.util.Random; + +import static org.slf4j.LoggerFactory.getLogger; + +public class NotifierMain { + Logger log = getLogger(getClass()); + Db db = new Db("notifier"); + Random r = new Random(); + + public static void main(String[] args) throws Exception { + new NotifierMain().main(); + } + + public void main() throws Exception { + Connection c = db.getConnection(); +// c.setAutoCommit(false); + + int count = 1 + r.nextInt(9); + + for (int i = 0; i < count; i++) { + c.createStatement().execute("NOTIFY mail_raw, '" + i + "';"); + } +// c.commit(); + + log.info("NOTIFY performed, count={}", count); + + c.close(); + } +} -- cgit v1.2.3