aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/queue/JdbcQueueService.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/queue/JdbcQueueService.java')
-rw-r--r--src/main/java/io/trygvis/queue/JdbcQueueService.java131
1 files changed, 17 insertions, 114 deletions
diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java
index cb7af4b..a366838 100644
--- a/src/main/java/io/trygvis/queue/JdbcQueueService.java
+++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java
@@ -1,121 +1,42 @@
package io.trygvis.queue;
-import io.trygvis.async.SqlEffect;
import io.trygvis.async.SqlEffectExecutor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.SQLException;
-import java.util.Date;
-import java.util.List;
-
-import static io.trygvis.queue.QueueService.TaskExecutionRequest;
-import static io.trygvis.queue.Task.TaskState.NEW;
+import java.util.HashMap;
+import java.util.Map;
public class JdbcQueueService {
- private final Logger log = LoggerFactory.getLogger(getClass());
-
private final QueueSystem queueSystem;
private final SqlEffectExecutor sqlEffectExecutor;
+ private final Map<String, QueueExecutor> queues = new HashMap<>();
+
JdbcQueueService(QueueSystem queueSystem) {
this.queueSystem = queueSystem;
this.sqlEffectExecutor = queueSystem.sqlEffectExecutor;
}
- public void consumeAll(final Queue queue, final TaskExecutionRequest req, final TaskEffect effect) throws SQLException {
- log.info("Consuming tasks: request={}", req);
-
- List<Task> tasks;
- do {
- tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() {
- @Override
- public List<Task> doInConnection(Connection c) throws SQLException {
- return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, req.chunkSize);
- }
- });
-
- log.info("Consuming chunk with {} tasks", tasks.size());
-
- applyTasks(req, effect, tasks);
- } while (tasks.size() > 0);
- }
-
- public void executeTask(TaskExecutionRequest req, TaskEffect taskEffect, List<Task> tasks) throws SQLException {
- applyTasks(req, taskEffect, tasks);
- }
+ public synchronized QueueExecutor getQueue(String name) {
+ QueueExecutor queueExecutor = queues.get(name);
- /**
- * Tries to execute all the tasks on the connection. If it fails, it will execute an SQL effect.
- */
- private void applyTasks(TaskExecutionRequest req, TaskEffect effect, List<Task> tasks) throws SQLException {
- for (Task task : tasks) {
- boolean ok = applyTask(effect, task);
-
- if (!ok && req.stopOnError) {
- throw new RuntimeException("Error while executing task, id=" + task.id());
- }
+ if (queueExecutor != null) {
+ return queueExecutor;
}
- }
-
- private boolean applyTask(TaskEffect effect, final Task task) throws SQLException {
- try {
- final Date run = new Date();
- Integer count = sqlEffectExecutor.transaction(new SqlEffect<Integer>() {
- @Override
- public Integer doInConnection(Connection c) throws SQLException {
- return queueSystem.createTaskDao(c).update(task.markProcessing());
- }
- });
-
- if (count == 1) {
- log.info("Executing task {}", task.id());
- } else {
- log.trace("Missed task {}", task.id());
- }
-
- final List<Task> newTasks = effect.apply(task);
-
- final Date now = new Date();
-
- log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size());
- sqlEffectExecutor.transaction(new SqlEffect.Void() {
- @Override
- public void doInConnection(Connection c) throws SQLException {
- for (Task newTask : newTasks) {
- schedule(c, newTask);
- }
-
- queueSystem.createTaskDao(c).update(task.markOk(now));
- }
- });
-
- return true;
- } catch (Exception e) {
- final Date now = new Date();
- log.error("Unable to execute task, id=" + task.id(), e);
-
- sqlEffectExecutor.transaction(new SqlEffect.Void() {
- @Override
- public void doInConnection(Connection c) throws SQLException {
- TaskDao taskDao = queueSystem.createTaskDao(c);
- taskDao.update(task.markFailed(now));
- }
- });
+ throw new IllegalArgumentException("No such queue: " + name);
+ }
- if (e instanceof SQLException) {
- throw ((SQLException) e);
- }
+ public synchronized QueueExecutor lookupQueue(Connection c, String name, long interval, boolean autoCreate) throws SQLException {
+ QueueExecutor queueExecutor = queues.get(name);
- return false;
+ if (queueExecutor != null) {
+ return queueExecutor;
}
- }
- public Queue lookupQueue(Connection c, String name, int interval, boolean autoCreate) throws SQLException {
QueueDao queueDao = queueSystem.createQueueDao(c);
Queue q = queueDao.findByName(name);
@@ -129,26 +50,8 @@ public class JdbcQueueService {
queueDao.insert(q);
}
- return q;
- }
-
- public void schedule(Connection c, Task task) throws SQLException {
- schedule(c, task.queue, task.parent, task.scheduled, task.arguments);
- }
-
- public Task schedule(Connection c, Queue queue, Date scheduled, List<String> arguments) throws SQLException {
- return schedule(c, queue.name, null, scheduled, arguments);
- }
-
- public Task schedule(Connection c, Queue queue, long parent, Date scheduled, List<String> arguments) throws SQLException {
- return schedule(c, queue.name, parent, scheduled, arguments);
- }
-
- private Task schedule(Connection c, String queue, Long parent, Date scheduled, List<String> arguments) throws SQLException {
- TaskDao taskDao = queueSystem.createTaskDao(c);
-
- long id = taskDao.insert(parent, queue, NEW, scheduled, arguments);
-
- return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments);
+ queueExecutor = new QueueExecutor(queueSystem, sqlEffectExecutor, q);
+ queues.put(name, queueExecutor);
+ return queueExecutor;
}
}