package io.trygvis.queue; import io.trygvis.async.SqlEffect; import io.trygvis.async.SqlEffectExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.SQLException; import java.util.Date; import java.util.List; import static io.trygvis.queue.QueueService.TaskExecutionRequest; import static io.trygvis.queue.Task.TaskState.NEW; import static io.trygvis.queue.Task.TaskState.PROCESSING; public class JdbcQueueService { private final Logger log = LoggerFactory.getLogger(getClass()); private final QueueSystem queueSystem; private final SqlEffectExecutor sqlEffectExecutor; JdbcQueueService(QueueSystem queueSystem) { this.queueSystem = queueSystem; this.sqlEffectExecutor = queueSystem.sqlEffectExecutor; } public void consumeAll(final Queue queue, TaskExecutionRequest req, final TaskEffect effect) throws SQLException { final List tasks = sqlEffectExecutor.transaction(new SqlEffect>() { @Override public List doInConnection(Connection c) throws SQLException { return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW); } }); applyTasks(req, effect, tasks); } public void executeTask(TaskExecutionRequest req, TaskEffect taskEffect, List tasks) throws SQLException { applyTasks(req, taskEffect, tasks); } /** * Tries to execute all the tasks on the connection. If it fails, it will execute an SQL effect. */ private void applyTasks(TaskExecutionRequest req, TaskEffect effect, List tasks) throws SQLException { for (Task task : tasks) { boolean ok = applyTask(effect, task); if (!ok && req.stopOnError) { throw new RuntimeException("Error while executing task, id=" + task.id()); } } } private boolean applyTask(TaskEffect effect, final Task task) throws SQLException { try { final Date run = new Date(); Integer count = sqlEffectExecutor.transaction(new SqlEffect() { @Override public Integer doInConnection(Connection c) throws SQLException { return queueSystem.createTaskDao(c).update(task.markProcessing()); } }); if (count == 1) { log.info("Executing task {}", task.id()); } else { log.trace("Missed task {}", task.id()); } final List newTasks = effect.apply(task); final Date now = new Date(); log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size()); sqlEffectExecutor.transaction(new SqlEffect.Void() { @Override public void doInConnection(Connection c) throws SQLException { for (Task newTask : newTasks) { schedule(c, newTask); } queueSystem.createTaskDao(c).update(task.markOk(now)); } }); return true; } catch (Exception e) { final Date now = new Date(); log.error("Unable to execute task, id=" + task.id(), e); sqlEffectExecutor.transaction(new SqlEffect.Void() { @Override public void doInConnection(Connection c) throws SQLException { TaskDao taskDao = queueSystem.createTaskDao(c); taskDao.update(task.markFailed(now)); } }); if (e instanceof SQLException) { throw ((SQLException) e); } return false; } } public Queue lookupQueue(Connection c, String name, int interval, boolean autoCreate) throws SQLException { QueueDao queueDao = queueSystem.createQueueDao(c); Queue q = queueDao.findByName(name); if (q == null) { if (!autoCreate) { throw new SQLException("No such queue: '" + name + "'."); } q = new Queue(name, interval); queueDao.insert(q); } return q; } public void schedule(Connection c, Task task) throws SQLException { schedule(c, task.queue, task.parent, task.scheduled, task.arguments); } public Task schedule(Connection c, Queue queue, Date scheduled, List arguments) throws SQLException { return schedule(c, queue.name, null, scheduled, arguments); } public Task schedule(Connection c, Queue queue, long parent, Date scheduled, List arguments) throws SQLException { return schedule(c, queue.name, parent, scheduled, arguments); } private Task schedule(Connection c, String queue, Long parent, Date scheduled, List arguments) throws SQLException { TaskDao taskDao = queueSystem.createTaskDao(c); long id = taskDao.insert(parent, queue, NEW, scheduled, arguments); return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments); } }