aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/queue/JdbcQueueService.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/queue/JdbcQueueService.java')
-rw-r--r--src/main/java/io/trygvis/queue/JdbcQueueService.java22
1 files changed, 14 insertions, 8 deletions
diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java
index d284287..c99bf2e 100644
--- a/src/main/java/io/trygvis/queue/JdbcQueueService.java
+++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java
@@ -26,7 +26,7 @@ public class JdbcQueueService {
this.sqlEffectExecutor = queueSystem.sqlEffectExecutor;
}
- public void consumeAll(final Queue queue, final TaskEffect effect) {
+ public void consumeAll(final Queue queue, final TaskEffect effect) throws SQLException {
final List<Task> tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() {
@Override
public List<Task> doInConnection(Connection c) throws SQLException {
@@ -47,7 +47,7 @@ public class JdbcQueueService {
});
}
- public void executeTask(final TaskEffect taskEffect, final List<Task> tasks) {
+ public void executeTask(final TaskEffect taskEffect, final List<Task> tasks) throws SQLException {
sqlEffectExecutor.transaction(new SqlEffect.Void() {
@Override
public void doInConnection(Connection connection) throws SQLException {
@@ -102,12 +102,6 @@ public class JdbcQueueService {
final Date now = new Date();
log.error("Unable to execute task, id=" + task.id(), e);
- try {
- taskDao.rollback();
- } catch (SQLException e2) {
- log.error("Error rolling back transaction after failed apply.", e2);
- }
-
final Task t = task;
sqlEffectExecutor.transaction(new SqlEffect.Void() {
@Override
@@ -117,6 +111,12 @@ public class JdbcQueueService {
taskDao.update(task);
}
});
+
+ if(e instanceof SQLException) {
+ throw ((SQLException) e);
+ }
+
+ throw new RuntimeException("Error while executing task, id=" + task.id(), e);
}
}
@@ -156,4 +156,10 @@ public class JdbcQueueService {
return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments);
}
+
+ public static class TaskExecutionFailed extends Throwable {
+ public TaskExecutionFailed(Exception e) {
+ super(e);
+ }
+ }
}