From 1ec4fae12c5e5363591013e5a759590d913d6782 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sun, 16 Jun 2013 12:07:43 +0200 Subject: wip --- .../java/io/trygvis/queue/JdbcQueueService.java | 131 +++------------------ 1 file changed, 17 insertions(+), 114 deletions(-) (limited to 'src/main/java/io/trygvis/queue/JdbcQueueService.java') diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java index cb7af4b..a366838 100644 --- a/src/main/java/io/trygvis/queue/JdbcQueueService.java +++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java @@ -1,121 +1,42 @@ package io.trygvis.queue; -import io.trygvis.async.SqlEffect; import io.trygvis.async.SqlEffectExecutor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.SQLException; -import java.util.Date; -import java.util.List; - -import static io.trygvis.queue.QueueService.TaskExecutionRequest; -import static io.trygvis.queue.Task.TaskState.NEW; +import java.util.HashMap; +import java.util.Map; public class JdbcQueueService { - private final Logger log = LoggerFactory.getLogger(getClass()); - private final QueueSystem queueSystem; private final SqlEffectExecutor sqlEffectExecutor; + private final Map queues = new HashMap<>(); + JdbcQueueService(QueueSystem queueSystem) { this.queueSystem = queueSystem; this.sqlEffectExecutor = queueSystem.sqlEffectExecutor; } - public void consumeAll(final Queue queue, final TaskExecutionRequest req, final TaskEffect effect) throws SQLException { - log.info("Consuming tasks: request={}", req); - - List tasks; - do { - tasks = sqlEffectExecutor.transaction(new SqlEffect>() { - @Override - public List doInConnection(Connection c) throws SQLException { - return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, req.chunkSize); - } - }); - - log.info("Consuming chunk with {} tasks", tasks.size()); - - applyTasks(req, effect, tasks); - } while (tasks.size() > 0); - } - - public void executeTask(TaskExecutionRequest req, TaskEffect taskEffect, List tasks) throws SQLException { - applyTasks(req, taskEffect, tasks); - } + public synchronized QueueExecutor getQueue(String name) { + QueueExecutor queueExecutor = queues.get(name); - /** - * Tries to execute all the tasks on the connection. If it fails, it will execute an SQL effect. - */ - private void applyTasks(TaskExecutionRequest req, TaskEffect effect, List tasks) throws SQLException { - for (Task task : tasks) { - boolean ok = applyTask(effect, task); - - if (!ok && req.stopOnError) { - throw new RuntimeException("Error while executing task, id=" + task.id()); - } + if (queueExecutor != null) { + return queueExecutor; } - } - - private boolean applyTask(TaskEffect effect, final Task task) throws SQLException { - try { - final Date run = new Date(); - Integer count = sqlEffectExecutor.transaction(new SqlEffect() { - @Override - public Integer doInConnection(Connection c) throws SQLException { - return queueSystem.createTaskDao(c).update(task.markProcessing()); - } - }); - - if (count == 1) { - log.info("Executing task {}", task.id()); - } else { - log.trace("Missed task {}", task.id()); - } - - final List newTasks = effect.apply(task); - - final Date now = new Date(); - - log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size()); - sqlEffectExecutor.transaction(new SqlEffect.Void() { - @Override - public void doInConnection(Connection c) throws SQLException { - for (Task newTask : newTasks) { - schedule(c, newTask); - } - - queueSystem.createTaskDao(c).update(task.markOk(now)); - } - }); - - return true; - } catch (Exception e) { - final Date now = new Date(); - log.error("Unable to execute task, id=" + task.id(), e); - - sqlEffectExecutor.transaction(new SqlEffect.Void() { - @Override - public void doInConnection(Connection c) throws SQLException { - TaskDao taskDao = queueSystem.createTaskDao(c); - taskDao.update(task.markFailed(now)); - } - }); + throw new IllegalArgumentException("No such queue: " + name); + } - if (e instanceof SQLException) { - throw ((SQLException) e); - } + public synchronized QueueExecutor lookupQueue(Connection c, String name, long interval, boolean autoCreate) throws SQLException { + QueueExecutor queueExecutor = queues.get(name); - return false; + if (queueExecutor != null) { + return queueExecutor; } - } - public Queue lookupQueue(Connection c, String name, int interval, boolean autoCreate) throws SQLException { QueueDao queueDao = queueSystem.createQueueDao(c); Queue q = queueDao.findByName(name); @@ -129,26 +50,8 @@ public class JdbcQueueService { queueDao.insert(q); } - return q; - } - - public void schedule(Connection c, Task task) throws SQLException { - schedule(c, task.queue, task.parent, task.scheduled, task.arguments); - } - - public Task schedule(Connection c, Queue queue, Date scheduled, List arguments) throws SQLException { - return schedule(c, queue.name, null, scheduled, arguments); - } - - public Task schedule(Connection c, Queue queue, long parent, Date scheduled, List arguments) throws SQLException { - return schedule(c, queue.name, parent, scheduled, arguments); - } - - private Task schedule(Connection c, String queue, Long parent, Date scheduled, List arguments) throws SQLException { - TaskDao taskDao = queueSystem.createTaskDao(c); - - long id = taskDao.insert(parent, queue, NEW, scheduled, arguments); - - return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments); + queueExecutor = new QueueExecutor(queueSystem, sqlEffectExecutor, q); + queues.put(name, queueExecutor); + return queueExecutor; } } -- cgit v1.2.3