aboutsummaryrefslogtreecommitdiff
path: root/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/io/trygvis/async/SqlEffectExecutor.java58
-rw-r--r--src/main/java/io/trygvis/queue/JdbcQueueService.java22
-rw-r--r--src/main/java/io/trygvis/queue/TaskDao.java2
-rw-r--r--src/main/resources/create-postgresql.sql4
4 files changed, 36 insertions, 50 deletions
diff --git a/src/main/java/io/trygvis/async/SqlEffectExecutor.java b/src/main/java/io/trygvis/async/SqlEffectExecutor.java
index e03cf5e..51ad31d 100644
--- a/src/main/java/io/trygvis/async/SqlEffectExecutor.java
+++ b/src/main/java/io/trygvis/async/SqlEffectExecutor.java
@@ -2,7 +2,9 @@ package io.trygvis.async;
import javax.sql.DataSource;
import java.sql.Connection;
+import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Statement;
public class SqlEffectExecutor {
@@ -12,41 +14,13 @@ public class SqlEffectExecutor {
this.dataSource = dataSource;
}
- public <A> A transaction(SqlEffect<A> effect) {
-/*
+ public <A> A transaction(SqlEffect<A> effect) throws SQLException {
int pid;
try (Connection c = dataSource.getConnection()) {
-
- try (Statement statement = c.createStatement()) {
- ResultSet rs = statement.executeQuery("SELECT pg_backend_pid()");
- rs.next();
- pid = rs.getInt(1);
- }
-
+ pid = getPid(c);
System.out.println("pid = " + pid);
- try {
- effect.doInConnection(c);
- c.commit();
- } catch (SQLException e) {
- c.rollback();
- e.printStackTrace();
- } finally {
- System.out.println("Closing pid=" + pid);
- try {
- c.close();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- } catch (SQLException e) {
- e.printStackTrace();
- throw new SqlExecutionException(e);
- }
-*/
-
- try (Connection c = dataSource.getConnection()) {
boolean ok = false;
try {
A a = effect.doInConnection(c);
@@ -54,16 +28,19 @@ public class SqlEffectExecutor {
ok = true;
return a;
} finally {
+ System.out.println("Closing, pid = " + pid);
if (!ok) {
- c.rollback();
+ try {
+ c.rollback();
+ } catch (SQLException e) {
+ // ignore
+ }
}
}
- } catch (SQLException e) {
- throw new SqlExecutionException(e);
}
}
- public void transaction(final SqlEffect.Void effect) {
+ public void transaction(final SqlEffect.Void effect) throws SQLException {
transaction(new SqlEffect<Object>() {
@Override
public Object doInConnection(Connection c) throws SQLException {
@@ -73,12 +50,13 @@ public class SqlEffectExecutor {
});
}
- public static class SqlExecutionException extends RuntimeException {
- public final SQLException exception;
-
- public SqlExecutionException(SQLException ex) {
- super(ex);
- this.exception = ex;
+ private int getPid(Connection c) throws SQLException {
+ int pid;
+ try (Statement statement = c.createStatement()) {
+ ResultSet rs = statement.executeQuery("SELECT pg_backend_pid()");
+ rs.next();
+ pid = rs.getInt(1);
}
+ return pid;
}
}
diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java
index d284287..c99bf2e 100644
--- a/src/main/java/io/trygvis/queue/JdbcQueueService.java
+++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java
@@ -26,7 +26,7 @@ public class JdbcQueueService {
this.sqlEffectExecutor = queueSystem.sqlEffectExecutor;
}
- public void consumeAll(final Queue queue, final TaskEffect effect) {
+ public void consumeAll(final Queue queue, final TaskEffect effect) throws SQLException {
final List<Task> tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() {
@Override
public List<Task> doInConnection(Connection c) throws SQLException {
@@ -47,7 +47,7 @@ public class JdbcQueueService {
});
}
- public void executeTask(final TaskEffect taskEffect, final List<Task> tasks) {
+ public void executeTask(final TaskEffect taskEffect, final List<Task> tasks) throws SQLException {
sqlEffectExecutor.transaction(new SqlEffect.Void() {
@Override
public void doInConnection(Connection connection) throws SQLException {
@@ -102,12 +102,6 @@ public class JdbcQueueService {
final Date now = new Date();
log.error("Unable to execute task, id=" + task.id(), e);
- try {
- taskDao.rollback();
- } catch (SQLException e2) {
- log.error("Error rolling back transaction after failed apply.", e2);
- }
-
final Task t = task;
sqlEffectExecutor.transaction(new SqlEffect.Void() {
@Override
@@ -117,6 +111,12 @@ public class JdbcQueueService {
taskDao.update(task);
}
});
+
+ if(e instanceof SQLException) {
+ throw ((SQLException) e);
+ }
+
+ throw new RuntimeException("Error while executing task, id=" + task.id(), e);
}
}
@@ -156,4 +156,10 @@ public class JdbcQueueService {
return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments);
}
+
+ public static class TaskExecutionFailed extends Throwable {
+ public TaskExecutionFailed(Exception e) {
+ super(e);
+ }
+ }
}
diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java
index 025823b..8b58585 100644
--- a/src/main/java/io/trygvis/queue/TaskDao.java
+++ b/src/main/java/io/trygvis/queue/TaskDao.java
@@ -111,7 +111,7 @@ public class TaskDao {
rs.getLong(i++),
rs.getLong(i++),
rs.getString(i++),
- valueOf(rs.getString(i++)),
+ TaskState.valueOf(rs.getString(i++).trim()),
rs.getTimestamp(i++),
rs.getTimestamp(i++),
rs.getInt(i++),
diff --git a/src/main/resources/create-postgresql.sql b/src/main/resources/create-postgresql.sql
index 4773a0c..7c331fd 100644
--- a/src/main/resources/create-postgresql.sql
+++ b/src/main/resources/create-postgresql.sql
@@ -14,7 +14,7 @@ CREATE TABLE task (
id BIGINT NOT NULL,
parent BIGINT,
queue VARCHAR(100) NOT NULL,
- state VARCHAR(100) NOT NULL,
+ state CHAR(10) NOT NULL,
scheduled TIMESTAMP NOT NULL,
last_run TIMESTAMP,
run_count INT NOT NULL,
@@ -25,6 +25,8 @@ CREATE TABLE task (
CONSTRAINT fk_task__parent FOREIGN KEY (parent) REFERENCES task (id)
);
+CREATE INDEX ix_task__queue__state ON task (queue, state);
+
CREATE SEQUENCE task_seq;
COMMIT;