aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/queue
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/queue')
-rw-r--r--src/main/java/io/trygvis/queue/QueueExecutor.java27
-rw-r--r--src/main/java/io/trygvis/queue/QueueService.java4
-rw-r--r--src/main/java/io/trygvis/queue/TaskDao.java17
3 files changed, 28 insertions, 20 deletions
diff --git a/src/main/java/io/trygvis/queue/QueueExecutor.java b/src/main/java/io/trygvis/queue/QueueExecutor.java
index a1eb3b7..3739532 100644
--- a/src/main/java/io/trygvis/queue/QueueExecutor.java
+++ b/src/main/java/io/trygvis/queue/QueueExecutor.java
@@ -10,11 +10,8 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.util.Date;
import java.util.List;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.FAILED;
-import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.MISSED;
-import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.OK;
+import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.*;
import static io.trygvis.queue.Task.TaskState.NEW;
public class QueueExecutor {
@@ -51,6 +48,10 @@ public class QueueExecutor {
}
}
+ public QueueStats getStats() {
+ return stats.toStats();
+ }
+
public void consumeAll(final QueueService.TaskExecutionRequest req, final TaskEffect effect) throws SQLException {
log.info("Consuming tasks: request={}", req);
@@ -69,17 +70,7 @@ public class QueueExecutor {
} while (tasks.size() > 0);
}
- public void executeTasks(final QueueService.TaskExecutionRequest req, final TaskEffect taskEffect, final List<Task> tasks,
- ScheduledThreadPoolExecutor executor) {
- executor.execute(new Runnable() {
- @Override
- public void run() {
- applyTasks(req, taskEffect, tasks);
- }
- });
- }
-
- private void applyTasks(QueueService.TaskExecutionRequest req, TaskEffect effect, List<Task> tasks) {
+ public void applyTasks(QueueService.TaskExecutionRequest req, TaskEffect effect, List<Task> tasks) {
for (Task task : tasks) {
TaskExecutionResult result = applyTask(effect, task);
@@ -94,17 +85,17 @@ public class QueueExecutor {
* <p/>
* If the task fails, the status is set to error in a separate transaction.
*/
- private TaskExecutionResult applyTask(TaskEffect effect, final Task task) {
+ public TaskExecutionResult applyTask(TaskEffect effect, final Task task) {
try {
Integer count = sqlEffectExecutor.transaction(new SqlEffect<Integer>() {
@Override
public Integer doInConnection(Connection c) throws SQLException {
- return queueSystem.createTaskDao(c).update(task.markProcessing());
+ return queueSystem.createTaskDao(c).update(task.markProcessing(), NEW);
}
});
if (count == 0) {
- log.trace("Missed task {}", task.id());
+ log.warn("Missed task {}", task.id());
return MISSED;
}
diff --git a/src/main/java/io/trygvis/queue/QueueService.java b/src/main/java/io/trygvis/queue/QueueService.java
index eee14ed..f4ce536 100644
--- a/src/main/java/io/trygvis/queue/QueueService.java
+++ b/src/main/java/io/trygvis/queue/QueueService.java
@@ -15,6 +15,10 @@ public interface QueueService {
// TODO: saveExceptions
public TaskExecutionRequest(long chunkSize, boolean stopOnError) {
+ if (chunkSize <= 0) {
+ throw new IllegalArgumentException("chunkSize has to be bigger than zero.");
+ }
+
this.chunkSize = chunkSize;
this.stopOnError = stopOnError;
}
diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java
index 5d77a41..365b44b 100644
--- a/src/main/java/io/trygvis/queue/TaskDao.java
+++ b/src/main/java/io/trygvis/queue/TaskDao.java
@@ -84,14 +84,27 @@ public class TaskDao {
}
public int update(Task task) throws SQLException {
- try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=?, scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?")) {
+ return update(task, null);
+ }
+
+ public int update(Task task, TaskState state) throws SQLException {
+ String sql = "UPDATE task SET state=?, scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?";
+
+ if (state != null) {
+ sql += " AND state=?";
+ }
+
+ try (PreparedStatement stmt = c.prepareStatement(sql)) {
int i = 1;
stmt.setString(i++, task.state.name());
stmt.setTimestamp(i++, new Timestamp(task.scheduled.getTime()));
setTimestamp(stmt, i++, task.lastRun);
stmt.setInt(i++, task.runCount);
setTimestamp(stmt, i++, task.completed);
- stmt.setLong(i, task.id());
+ stmt.setLong(i++, task.id());
+ if (state != null) {
+ stmt.setString(i, state.name());
+ }
return stmt.executeUpdate();
}
}