aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2013-06-09 23:22:39 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2013-06-09 23:22:39 +0200
commit1c0fc92c719f3856653d0efcc5fe4a1fa30b7bac (patch)
treeab1ebbfbd21fe1fcc5be75d31c5262f65a931641
parent33e3be55dc2d815cbd0208bf59d12a7e727f3105 (diff)
downloadquartz-based-queue-1c0fc92c719f3856653d0efcc5fe4a1fa30b7bac.tar.gz
quartz-based-queue-1c0fc92c719f3856653d0efcc5fe4a1fa30b7bac.tar.bz2
quartz-based-queue-1c0fc92c719f3856653d0efcc5fe4a1fa30b7bac.tar.xz
quartz-based-queue-1c0fc92c719f3856653d0efcc5fe4a1fa30b7bac.zip
wip
-rw-r--r--src/main/java/io/trygvis/async/SqlEffectExecutor.java58
-rw-r--r--src/main/java/io/trygvis/queue/JdbcQueueService.java22
-rw-r--r--src/main/java/io/trygvis/queue/TaskDao.java2
-rw-r--r--src/main/resources/create-postgresql.sql4
-rw-r--r--src/test/java/io/trygvis/test/DbUtil.java7
-rw-r--r--src/test/java/io/trygvis/test/PlainJavaExample.java26
-rwxr-xr-xsrc/test/resources/logback.xml6
7 files changed, 67 insertions, 58 deletions
diff --git a/src/main/java/io/trygvis/async/SqlEffectExecutor.java b/src/main/java/io/trygvis/async/SqlEffectExecutor.java
index e03cf5e..51ad31d 100644
--- a/src/main/java/io/trygvis/async/SqlEffectExecutor.java
+++ b/src/main/java/io/trygvis/async/SqlEffectExecutor.java
@@ -2,7 +2,9 @@ package io.trygvis.async;
import javax.sql.DataSource;
import java.sql.Connection;
+import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Statement;
public class SqlEffectExecutor {
@@ -12,41 +14,13 @@ public class SqlEffectExecutor {
this.dataSource = dataSource;
}
- public <A> A transaction(SqlEffect<A> effect) {
-/*
+ public <A> A transaction(SqlEffect<A> effect) throws SQLException {
int pid;
try (Connection c = dataSource.getConnection()) {
-
- try (Statement statement = c.createStatement()) {
- ResultSet rs = statement.executeQuery("SELECT pg_backend_pid()");
- rs.next();
- pid = rs.getInt(1);
- }
-
+ pid = getPid(c);
System.out.println("pid = " + pid);
- try {
- effect.doInConnection(c);
- c.commit();
- } catch (SQLException e) {
- c.rollback();
- e.printStackTrace();
- } finally {
- System.out.println("Closing pid=" + pid);
- try {
- c.close();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- } catch (SQLException e) {
- e.printStackTrace();
- throw new SqlExecutionException(e);
- }
-*/
-
- try (Connection c = dataSource.getConnection()) {
boolean ok = false;
try {
A a = effect.doInConnection(c);
@@ -54,16 +28,19 @@ public class SqlEffectExecutor {
ok = true;
return a;
} finally {
+ System.out.println("Closing, pid = " + pid);
if (!ok) {
- c.rollback();
+ try {
+ c.rollback();
+ } catch (SQLException e) {
+ // ignore
+ }
}
}
- } catch (SQLException e) {
- throw new SqlExecutionException(e);
}
}
- public void transaction(final SqlEffect.Void effect) {
+ public void transaction(final SqlEffect.Void effect) throws SQLException {
transaction(new SqlEffect<Object>() {
@Override
public Object doInConnection(Connection c) throws SQLException {
@@ -73,12 +50,13 @@ public class SqlEffectExecutor {
});
}
- public static class SqlExecutionException extends RuntimeException {
- public final SQLException exception;
-
- public SqlExecutionException(SQLException ex) {
- super(ex);
- this.exception = ex;
+ private int getPid(Connection c) throws SQLException {
+ int pid;
+ try (Statement statement = c.createStatement()) {
+ ResultSet rs = statement.executeQuery("SELECT pg_backend_pid()");
+ rs.next();
+ pid = rs.getInt(1);
}
+ return pid;
}
}
diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java
index d284287..c99bf2e 100644
--- a/src/main/java/io/trygvis/queue/JdbcQueueService.java
+++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java
@@ -26,7 +26,7 @@ public class JdbcQueueService {
this.sqlEffectExecutor = queueSystem.sqlEffectExecutor;
}
- public void consumeAll(final Queue queue, final TaskEffect effect) {
+ public void consumeAll(final Queue queue, final TaskEffect effect) throws SQLException {
final List<Task> tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() {
@Override
public List<Task> doInConnection(Connection c) throws SQLException {
@@ -47,7 +47,7 @@ public class JdbcQueueService {
});
}
- public void executeTask(final TaskEffect taskEffect, final List<Task> tasks) {
+ public void executeTask(final TaskEffect taskEffect, final List<Task> tasks) throws SQLException {
sqlEffectExecutor.transaction(new SqlEffect.Void() {
@Override
public void doInConnection(Connection connection) throws SQLException {
@@ -102,12 +102,6 @@ public class JdbcQueueService {
final Date now = new Date();
log.error("Unable to execute task, id=" + task.id(), e);
- try {
- taskDao.rollback();
- } catch (SQLException e2) {
- log.error("Error rolling back transaction after failed apply.", e2);
- }
-
final Task t = task;
sqlEffectExecutor.transaction(new SqlEffect.Void() {
@Override
@@ -117,6 +111,12 @@ public class JdbcQueueService {
taskDao.update(task);
}
});
+
+ if(e instanceof SQLException) {
+ throw ((SQLException) e);
+ }
+
+ throw new RuntimeException("Error while executing task, id=" + task.id(), e);
}
}
@@ -156,4 +156,10 @@ public class JdbcQueueService {
return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments);
}
+
+ public static class TaskExecutionFailed extends Throwable {
+ public TaskExecutionFailed(Exception e) {
+ super(e);
+ }
+ }
}
diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java
index 025823b..8b58585 100644
--- a/src/main/java/io/trygvis/queue/TaskDao.java
+++ b/src/main/java/io/trygvis/queue/TaskDao.java
@@ -111,7 +111,7 @@ public class TaskDao {
rs.getLong(i++),
rs.getLong(i++),
rs.getString(i++),
- valueOf(rs.getString(i++)),
+ TaskState.valueOf(rs.getString(i++).trim()),
rs.getTimestamp(i++),
rs.getTimestamp(i++),
rs.getInt(i++),
diff --git a/src/main/resources/create-postgresql.sql b/src/main/resources/create-postgresql.sql
index 4773a0c..7c331fd 100644
--- a/src/main/resources/create-postgresql.sql
+++ b/src/main/resources/create-postgresql.sql
@@ -14,7 +14,7 @@ CREATE TABLE task (
id BIGINT NOT NULL,
parent BIGINT,
queue VARCHAR(100) NOT NULL,
- state VARCHAR(100) NOT NULL,
+ state CHAR(10) NOT NULL,
scheduled TIMESTAMP NOT NULL,
last_run TIMESTAMP,
run_count INT NOT NULL,
@@ -25,6 +25,8 @@ CREATE TABLE task (
CONSTRAINT fk_task__parent FOREIGN KEY (parent) REFERENCES task (id)
);
+CREATE INDEX ix_task__queue__state ON task (queue, state);
+
CREATE SEQUENCE task_seq;
COMMIT;
diff --git a/src/test/java/io/trygvis/test/DbUtil.java b/src/test/java/io/trygvis/test/DbUtil.java
index 2362e65..a2c41d3 100644
--- a/src/test/java/io/trygvis/test/DbUtil.java
+++ b/src/test/java/io/trygvis/test/DbUtil.java
@@ -5,7 +5,6 @@ import org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy;
import org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy;
import javax.sql.DataSource;
-
import java.io.PrintWriter;
import java.sql.SQLException;
@@ -34,7 +33,7 @@ public class DbUtil {
ds.setDefaultAutoCommit(false);
ds.setIdleConnectionTestPeriodInSeconds(60);
ds.setIdleMaxAgeInSeconds(240);
- ds.setMaxConnectionsPerPartition(1);
+ ds.setMaxConnectionsPerPartition(2);
ds.setMinConnectionsPerPartition(0);
ds.setPartitionCount(1);
ds.setAcquireIncrement(1);
@@ -43,6 +42,10 @@ public class DbUtil {
ds.setStatisticsEnabled(true);
ds.setLogStatementsEnabled(true);
ds.setLogWriter(new PrintWriter(System.err));
+ return ds;
+ }
+
+ public static DataSource springifyDataSource(DataSource ds) {
return new TransactionAwareDataSourceProxy(new LazyConnectionDataSourceProxy(ds));
}
}
diff --git a/src/test/java/io/trygvis/test/PlainJavaExample.java b/src/test/java/io/trygvis/test/PlainJavaExample.java
index b09d3e9..488ee35 100644
--- a/src/test/java/io/trygvis/test/PlainJavaExample.java
+++ b/src/test/java/io/trygvis/test/PlainJavaExample.java
@@ -16,6 +16,7 @@ import java.util.List;
import java.util.Random;
import static io.trygvis.test.DbUtil.createDataSource;
+import static java.lang.System.currentTimeMillis;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
@@ -58,7 +59,7 @@ public class PlainJavaExample {
System.out.println("a + b = " + a + " + " + b + " = " + (a + b));
- if(r.nextInt(3) == 0) {
+ if (r.nextInt(3) == 0) {
return singletonList(task.childTask(output.name, new Date(), Long.toString(a + b)));
}
@@ -73,17 +74,34 @@ public class PlainJavaExample {
public static void main(String[] args) throws Exception {
System.out.println("Starting producer");
+ int chunks = 10;
+ final int chunk = 2000;
+
DataSource ds = createDataSource();
Connection c = ds.getConnection();
SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds);
QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor);
- JdbcQueueService queueService = queueSystem.queueService;
+ final JdbcQueueService queueService = queueSystem.queueService;
- Queue queue = queueService.lookupQueue(c, inputName, interval, true);
+ final Queue queue = queueService.lookupQueue(c, inputName, interval, true);
+
+ for (int i = 0; i < chunks; i++) {
+ long start = currentTimeMillis();
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ for (int j = 0; j < chunk; j++) {
+ queueService.schedule(c, queue, new Date(), asList("10", "20"));
+ }
+ }
+ });
+ long end = currentTimeMillis();
- queueService.schedule(c, queue, new Date(), asList("10", "20"));
+ long time = end - start;
+ System.out.println("Scheduled " + chunk + " tasks in " + time + "ms, " + (((double) chunk * 1000)) / ((double) time) + " chunks per second");
+ }
c.commit();
}
diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml
index 676eac5..a9e4a25 100755
--- a/src/test/resources/logback.xml
+++ b/src/test/resources/logback.xml
@@ -2,16 +2,18 @@
<logger name="io.trygvis" level="DEBUG"/>
<!-- <logger name="org.springframework" level="INFO"/> -->
- <logger name="org.springframework" level="INFO"/>
+ <logger name="org.springframework" level="DEBUG"/>
+<!--
<logger name="org.springframework.jdbc" level="INFO"/>
<logger name="org.springframework.jdbc.datasource.DataSourceTransactionManager" level="DEBUG"/>
<logger name="org.springframework.orm" level="INFO"/>
<logger name="org.springframework.transaction" level="DEBUG"/>
+-->
<logger name="org.hibernate" level="INFO"/>
<logger name="org.hibernate.SQL" level="INFO"/>
- <logger name="com.jolbox" level="TRACE"/>
+ <logger name="com.jolbox" level="INFO"/>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>