From 1c0fc92c719f3856653d0efcc5fe4a1fa30b7bac Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sun, 9 Jun 2013 23:22:39 +0200 Subject: wip --- .../java/io/trygvis/async/SqlEffectExecutor.java | 58 +++++++--------------- .../java/io/trygvis/queue/JdbcQueueService.java | 22 +++++--- src/main/java/io/trygvis/queue/TaskDao.java | 2 +- src/main/resources/create-postgresql.sql | 4 +- src/test/java/io/trygvis/test/DbUtil.java | 7 ++- .../java/io/trygvis/test/PlainJavaExample.java | 26 ++++++++-- src/test/resources/logback.xml | 6 ++- 7 files changed, 67 insertions(+), 58 deletions(-) diff --git a/src/main/java/io/trygvis/async/SqlEffectExecutor.java b/src/main/java/io/trygvis/async/SqlEffectExecutor.java index e03cf5e..51ad31d 100644 --- a/src/main/java/io/trygvis/async/SqlEffectExecutor.java +++ b/src/main/java/io/trygvis/async/SqlEffectExecutor.java @@ -2,7 +2,9 @@ package io.trygvis.async; import javax.sql.DataSource; import java.sql.Connection; +import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Statement; public class SqlEffectExecutor { @@ -12,41 +14,13 @@ public class SqlEffectExecutor { this.dataSource = dataSource; } - public A transaction(SqlEffect effect) { -/* + public A transaction(SqlEffect effect) throws SQLException { int pid; try (Connection c = dataSource.getConnection()) { - - try (Statement statement = c.createStatement()) { - ResultSet rs = statement.executeQuery("SELECT pg_backend_pid()"); - rs.next(); - pid = rs.getInt(1); - } - + pid = getPid(c); System.out.println("pid = " + pid); - try { - effect.doInConnection(c); - c.commit(); - } catch (SQLException e) { - c.rollback(); - e.printStackTrace(); - } finally { - System.out.println("Closing pid=" + pid); - try { - c.close(); - } catch (Exception e) { - e.printStackTrace(); - } - } - } catch (SQLException e) { - e.printStackTrace(); - throw new SqlExecutionException(e); - } -*/ - - try (Connection c = dataSource.getConnection()) { boolean ok = false; try { A a = effect.doInConnection(c); @@ -54,16 +28,19 @@ public class SqlEffectExecutor { ok = true; return a; } finally { + System.out.println("Closing, pid = " + pid); if (!ok) { - c.rollback(); + try { + c.rollback(); + } catch (SQLException e) { + // ignore + } } } - } catch (SQLException e) { - throw new SqlExecutionException(e); } } - public void transaction(final SqlEffect.Void effect) { + public void transaction(final SqlEffect.Void effect) throws SQLException { transaction(new SqlEffect() { @Override public Object doInConnection(Connection c) throws SQLException { @@ -73,12 +50,13 @@ public class SqlEffectExecutor { }); } - public static class SqlExecutionException extends RuntimeException { - public final SQLException exception; - - public SqlExecutionException(SQLException ex) { - super(ex); - this.exception = ex; + private int getPid(Connection c) throws SQLException { + int pid; + try (Statement statement = c.createStatement()) { + ResultSet rs = statement.executeQuery("SELECT pg_backend_pid()"); + rs.next(); + pid = rs.getInt(1); } + return pid; } } diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java index d284287..c99bf2e 100644 --- a/src/main/java/io/trygvis/queue/JdbcQueueService.java +++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java @@ -26,7 +26,7 @@ public class JdbcQueueService { this.sqlEffectExecutor = queueSystem.sqlEffectExecutor; } - public void consumeAll(final Queue queue, final TaskEffect effect) { + public void consumeAll(final Queue queue, final TaskEffect effect) throws SQLException { final List tasks = sqlEffectExecutor.transaction(new SqlEffect>() { @Override public List doInConnection(Connection c) throws SQLException { @@ -47,7 +47,7 @@ public class JdbcQueueService { }); } - public void executeTask(final TaskEffect taskEffect, final List tasks) { + public void executeTask(final TaskEffect taskEffect, final List tasks) throws SQLException { sqlEffectExecutor.transaction(new SqlEffect.Void() { @Override public void doInConnection(Connection connection) throws SQLException { @@ -102,12 +102,6 @@ public class JdbcQueueService { final Date now = new Date(); log.error("Unable to execute task, id=" + task.id(), e); - try { - taskDao.rollback(); - } catch (SQLException e2) { - log.error("Error rolling back transaction after failed apply.", e2); - } - final Task t = task; sqlEffectExecutor.transaction(new SqlEffect.Void() { @Override @@ -117,6 +111,12 @@ public class JdbcQueueService { taskDao.update(task); } }); + + if(e instanceof SQLException) { + throw ((SQLException) e); + } + + throw new RuntimeException("Error while executing task, id=" + task.id(), e); } } @@ -156,4 +156,10 @@ public class JdbcQueueService { return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments); } + + public static class TaskExecutionFailed extends Throwable { + public TaskExecutionFailed(Exception e) { + super(e); + } + } } diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java index 025823b..8b58585 100644 --- a/src/main/java/io/trygvis/queue/TaskDao.java +++ b/src/main/java/io/trygvis/queue/TaskDao.java @@ -111,7 +111,7 @@ public class TaskDao { rs.getLong(i++), rs.getLong(i++), rs.getString(i++), - valueOf(rs.getString(i++)), + TaskState.valueOf(rs.getString(i++).trim()), rs.getTimestamp(i++), rs.getTimestamp(i++), rs.getInt(i++), diff --git a/src/main/resources/create-postgresql.sql b/src/main/resources/create-postgresql.sql index 4773a0c..7c331fd 100644 --- a/src/main/resources/create-postgresql.sql +++ b/src/main/resources/create-postgresql.sql @@ -14,7 +14,7 @@ CREATE TABLE task ( id BIGINT NOT NULL, parent BIGINT, queue VARCHAR(100) NOT NULL, - state VARCHAR(100) NOT NULL, + state CHAR(10) NOT NULL, scheduled TIMESTAMP NOT NULL, last_run TIMESTAMP, run_count INT NOT NULL, @@ -25,6 +25,8 @@ CREATE TABLE task ( CONSTRAINT fk_task__parent FOREIGN KEY (parent) REFERENCES task (id) ); +CREATE INDEX ix_task__queue__state ON task (queue, state); + CREATE SEQUENCE task_seq; COMMIT; diff --git a/src/test/java/io/trygvis/test/DbUtil.java b/src/test/java/io/trygvis/test/DbUtil.java index 2362e65..a2c41d3 100644 --- a/src/test/java/io/trygvis/test/DbUtil.java +++ b/src/test/java/io/trygvis/test/DbUtil.java @@ -5,7 +5,6 @@ import org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy; import org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy; import javax.sql.DataSource; - import java.io.PrintWriter; import java.sql.SQLException; @@ -34,7 +33,7 @@ public class DbUtil { ds.setDefaultAutoCommit(false); ds.setIdleConnectionTestPeriodInSeconds(60); ds.setIdleMaxAgeInSeconds(240); - ds.setMaxConnectionsPerPartition(1); + ds.setMaxConnectionsPerPartition(2); ds.setMinConnectionsPerPartition(0); ds.setPartitionCount(1); ds.setAcquireIncrement(1); @@ -43,6 +42,10 @@ public class DbUtil { ds.setStatisticsEnabled(true); ds.setLogStatementsEnabled(true); ds.setLogWriter(new PrintWriter(System.err)); + return ds; + } + + public static DataSource springifyDataSource(DataSource ds) { return new TransactionAwareDataSourceProxy(new LazyConnectionDataSourceProxy(ds)); } } diff --git a/src/test/java/io/trygvis/test/PlainJavaExample.java b/src/test/java/io/trygvis/test/PlainJavaExample.java index b09d3e9..488ee35 100644 --- a/src/test/java/io/trygvis/test/PlainJavaExample.java +++ b/src/test/java/io/trygvis/test/PlainJavaExample.java @@ -16,6 +16,7 @@ import java.util.List; import java.util.Random; import static io.trygvis.test.DbUtil.createDataSource; +import static java.lang.System.currentTimeMillis; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; @@ -58,7 +59,7 @@ public class PlainJavaExample { System.out.println("a + b = " + a + " + " + b + " = " + (a + b)); - if(r.nextInt(3) == 0) { + if (r.nextInt(3) == 0) { return singletonList(task.childTask(output.name, new Date(), Long.toString(a + b))); } @@ -73,17 +74,34 @@ public class PlainJavaExample { public static void main(String[] args) throws Exception { System.out.println("Starting producer"); + int chunks = 10; + final int chunk = 2000; + DataSource ds = createDataSource(); Connection c = ds.getConnection(); SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); - JdbcQueueService queueService = queueSystem.queueService; + final JdbcQueueService queueService = queueSystem.queueService; - Queue queue = queueService.lookupQueue(c, inputName, interval, true); + final Queue queue = queueService.lookupQueue(c, inputName, interval, true); + + for (int i = 0; i < chunks; i++) { + long start = currentTimeMillis(); + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + for (int j = 0; j < chunk; j++) { + queueService.schedule(c, queue, new Date(), asList("10", "20")); + } + } + }); + long end = currentTimeMillis(); - queueService.schedule(c, queue, new Date(), asList("10", "20")); + long time = end - start; + System.out.println("Scheduled " + chunk + " tasks in " + time + "ms, " + (((double) chunk * 1000)) / ((double) time) + " chunks per second"); + } c.commit(); } diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml index 676eac5..a9e4a25 100755 --- a/src/test/resources/logback.xml +++ b/src/test/resources/logback.xml @@ -2,16 +2,18 @@ - + + - + -- cgit v1.2.3