aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/queue/JdbcQueueService.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/queue/JdbcQueueService.java')
-rw-r--r--src/main/java/io/trygvis/queue/JdbcQueueService.java105
1 files changed, 44 insertions, 61 deletions
diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java
index c99bf2e..edd6c80 100644
--- a/src/main/java/io/trygvis/queue/JdbcQueueService.java
+++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java
@@ -10,6 +10,7 @@ import java.sql.SQLException;
import java.util.Date;
import java.util.List;
+import static io.trygvis.queue.QueueService.TaskExecutionRequest;
import static io.trygvis.queue.Task.TaskState.NEW;
import static io.trygvis.queue.Task.TaskState.PROCESSING;
@@ -26,97 +27,85 @@ public class JdbcQueueService {
this.sqlEffectExecutor = queueSystem.sqlEffectExecutor;
}
- public void consumeAll(final Queue queue, final TaskEffect effect) throws SQLException {
+ public void consumeAll(final Queue queue, TaskExecutionRequest req, final TaskEffect effect) throws SQLException {
final List<Task> tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() {
@Override
public List<Task> doInConnection(Connection c) throws SQLException {
- TaskDao taskDao = queueSystem.createTaskDao(c);
-
- List<Task> tasks = taskDao.findByNameAndCompletedIsNull(queue.name);
- log.trace("Got {} tasks.", tasks.size());
- taskDao.setState(tasks, PROCESSING);
- return tasks;
+ return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW);
}
});
- sqlEffectExecutor.transaction(new SqlEffect.Void() {
- @Override
- public void doInConnection(Connection c) throws SQLException {
- applyTasks(c, effect, queueSystem.createTaskDao(c), tasks);
- }
- });
+ applyTasks(req, effect, tasks);
}
- public void executeTask(final TaskEffect taskEffect, final List<Task> tasks) throws SQLException {
- sqlEffectExecutor.transaction(new SqlEffect.Void() {
- @Override
- public void doInConnection(Connection connection) throws SQLException {
- for (Task task : tasks) {
- final Date run = new Date();
- log.info("Setting last run on task. date = {}, task = {}", run, task);
- new TaskDao(connection).update(task.markProcessing());
- }
- }
- });
-
- sqlEffectExecutor.transaction(new SqlEffect.Void() {
- @Override
- public void doInConnection(Connection c) throws SQLException {
- TaskDao taskDao = new TaskDao(c);
-
- applyTasks(c, taskEffect, taskDao, tasks);
- }
- });
+ public void executeTask(TaskExecutionRequest req, TaskEffect taskEffect, List<Task> tasks) throws SQLException {
+ applyTasks(req, taskEffect, tasks);
}
/**
* Tries to execute all the tasks on the connection. If it fails, it will execute an SQL effect.
*/
- private void applyTasks(Connection c, TaskEffect effect, final TaskDao taskDao, List<Task> tasks) throws SQLException {
- Task task = null;
+ private void applyTasks(TaskExecutionRequest req, TaskEffect effect, List<Task> tasks) throws SQLException {
+ for (Task task : tasks) {
+ boolean ok = applyTask(effect, task);
+
+ if (!ok && req.stopOnError) {
+ throw new RuntimeException("Error while executing task, id=" + task.id());
+ }
+ }
+ }
+
+ private boolean applyTask(TaskEffect effect, final Task task) throws SQLException {
try {
- for (int i = 0; i < tasks.size(); i++) {
- task = tasks.get(i);
+ final Date run = new Date();
+ Integer count = sqlEffectExecutor.transaction(new SqlEffect<Integer>() {
+ @Override
+ public Integer doInConnection(Connection c) throws SQLException {
+ return queueSystem.createTaskDao(c).update(task.markProcessing());
+ }
+ });
+ if (count == 1) {
log.info("Executing task {}", task.id());
+ } else {
+ log.trace("Missed task {}", task.id());
+ }
- List<Task> newTasks = effect.apply(task);
+ final List<Task> newTasks = effect.apply(task);
- Date now = new Date();
+ final Date now = new Date();
- log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size());
+ log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size());
- task = task.markOk(now);
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ for (Task newTask : newTasks) {
+ schedule(c, newTask);
+ }
- for (Task newTask : newTasks) {
- schedule(c, newTask);
+ queueSystem.createTaskDao(c).update(task.markOk(now));
}
+ });
- taskDao.update(task);
- }
- } catch (final Exception e) {
- if (task == null) {
- return;
- }
-
+ return true;
+ } catch (Exception e) {
final Date now = new Date();
log.error("Unable to execute task, id=" + task.id(), e);
- final Task t = task;
sqlEffectExecutor.transaction(new SqlEffect.Void() {
@Override
public void doInConnection(Connection c) throws SQLException {
TaskDao taskDao = queueSystem.createTaskDao(c);
- Task task = t.markFailed(now);
- taskDao.update(task);
+ taskDao.update(task.markFailed(now));
}
});
- if(e instanceof SQLException) {
+ if (e instanceof SQLException) {
throw ((SQLException) e);
}
- throw new RuntimeException("Error while executing task, id=" + task.id(), e);
+ return false;
}
}
@@ -156,10 +145,4 @@ public class JdbcQueueService {
return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments);
}
-
- public static class TaskExecutionFailed extends Throwable {
- public TaskExecutionFailed(Exception e) {
- super(e);
- }
- }
}