From abb0b2aaf4ee5e6f147987401c9b059e5a7679d2 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sun, 9 Jun 2013 23:51:39 +0200 Subject: wip --- src/main/java/io/trygvis/async/QueueThread.java | 7 +- .../java/io/trygvis/async/SqlEffectExecutor.java | 18 +--- .../java/io/trygvis/queue/JdbcQueueService.java | 105 +++++++++------------ src/main/java/io/trygvis/queue/QueueService.java | 11 ++- src/main/java/io/trygvis/queue/TaskDao.java | 21 ++--- .../java/io/trygvis/spring/SpringQueueService.java | 6 +- src/test/java/io/trygvis/test/DbUtil.java | 13 +++ .../java/io/trygvis/test/PlainJavaExample.java | 5 +- src/test/resources/logback.xml | 2 +- 9 files changed, 93 insertions(+), 95 deletions(-) diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java index 33753a3..558e769 100644 --- a/src/main/java/io/trygvis/async/QueueThread.java +++ b/src/main/java/io/trygvis/async/QueueThread.java @@ -12,6 +12,9 @@ import java.sql.Connection; import java.sql.SQLException; import java.util.List; +import static io.trygvis.queue.QueueService.TaskExecutionRequest; +import static io.trygvis.queue.Task.TaskState.NEW; + class QueueThread implements Runnable { private final Logger log = LoggerFactory.getLogger(getClass()); @@ -56,14 +59,14 @@ class QueueThread implements Runnable { List tasks = sqlEffectExecutor.transaction(new SqlEffect>() { @Override public List doInConnection(Connection c) throws SQLException { - return queueSystem.createTaskDao(c).findByNameAndCompletedIsNull(queue.name); + return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW); } }); log.info("Found {} tasks on queue {}", tasks.size(), queue.name); if (tasks.size() > 0) { - queueService.executeTask(taskEffect, tasks); + queueService.executeTask(new TaskExecutionRequest(true), taskEffect, tasks); } } catch (Throwable e) { log.warn("Error while executing tasks.", e); diff --git a/src/main/java/io/trygvis/async/SqlEffectExecutor.java b/src/main/java/io/trygvis/async/SqlEffectExecutor.java index 51ad31d..3da2cd3 100644 --- a/src/main/java/io/trygvis/async/SqlEffectExecutor.java +++ b/src/main/java/io/trygvis/async/SqlEffectExecutor.java @@ -15,11 +15,11 @@ public class SqlEffectExecutor { } public A transaction(SqlEffect effect) throws SQLException { - int pid; +// int pid; try (Connection c = dataSource.getConnection()) { - pid = getPid(c); - System.out.println("pid = " + pid); +// pid = getPid(c); +// System.out.println("pid = " + pid); boolean ok = false; try { @@ -28,7 +28,7 @@ public class SqlEffectExecutor { ok = true; return a; } finally { - System.out.println("Closing, pid = " + pid); +// System.out.println("Closing, pid = " + pid); if (!ok) { try { c.rollback(); @@ -49,14 +49,4 @@ public class SqlEffectExecutor { } }); } - - private int getPid(Connection c) throws SQLException { - int pid; - try (Statement statement = c.createStatement()) { - ResultSet rs = statement.executeQuery("SELECT pg_backend_pid()"); - rs.next(); - pid = rs.getInt(1); - } - return pid; - } } diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java index c99bf2e..edd6c80 100644 --- a/src/main/java/io/trygvis/queue/JdbcQueueService.java +++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java @@ -10,6 +10,7 @@ import java.sql.SQLException; import java.util.Date; import java.util.List; +import static io.trygvis.queue.QueueService.TaskExecutionRequest; import static io.trygvis.queue.Task.TaskState.NEW; import static io.trygvis.queue.Task.TaskState.PROCESSING; @@ -26,97 +27,85 @@ public class JdbcQueueService { this.sqlEffectExecutor = queueSystem.sqlEffectExecutor; } - public void consumeAll(final Queue queue, final TaskEffect effect) throws SQLException { + public void consumeAll(final Queue queue, TaskExecutionRequest req, final TaskEffect effect) throws SQLException { final List tasks = sqlEffectExecutor.transaction(new SqlEffect>() { @Override public List doInConnection(Connection c) throws SQLException { - TaskDao taskDao = queueSystem.createTaskDao(c); - - List tasks = taskDao.findByNameAndCompletedIsNull(queue.name); - log.trace("Got {} tasks.", tasks.size()); - taskDao.setState(tasks, PROCESSING); - return tasks; + return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW); } }); - sqlEffectExecutor.transaction(new SqlEffect.Void() { - @Override - public void doInConnection(Connection c) throws SQLException { - applyTasks(c, effect, queueSystem.createTaskDao(c), tasks); - } - }); + applyTasks(req, effect, tasks); } - public void executeTask(final TaskEffect taskEffect, final List tasks) throws SQLException { - sqlEffectExecutor.transaction(new SqlEffect.Void() { - @Override - public void doInConnection(Connection connection) throws SQLException { - for (Task task : tasks) { - final Date run = new Date(); - log.info("Setting last run on task. date = {}, task = {}", run, task); - new TaskDao(connection).update(task.markProcessing()); - } - } - }); - - sqlEffectExecutor.transaction(new SqlEffect.Void() { - @Override - public void doInConnection(Connection c) throws SQLException { - TaskDao taskDao = new TaskDao(c); - - applyTasks(c, taskEffect, taskDao, tasks); - } - }); + public void executeTask(TaskExecutionRequest req, TaskEffect taskEffect, List tasks) throws SQLException { + applyTasks(req, taskEffect, tasks); } /** * Tries to execute all the tasks on the connection. If it fails, it will execute an SQL effect. */ - private void applyTasks(Connection c, TaskEffect effect, final TaskDao taskDao, List tasks) throws SQLException { - Task task = null; + private void applyTasks(TaskExecutionRequest req, TaskEffect effect, List tasks) throws SQLException { + for (Task task : tasks) { + boolean ok = applyTask(effect, task); + + if (!ok && req.stopOnError) { + throw new RuntimeException("Error while executing task, id=" + task.id()); + } + } + } + + private boolean applyTask(TaskEffect effect, final Task task) throws SQLException { try { - for (int i = 0; i < tasks.size(); i++) { - task = tasks.get(i); + final Date run = new Date(); + Integer count = sqlEffectExecutor.transaction(new SqlEffect() { + @Override + public Integer doInConnection(Connection c) throws SQLException { + return queueSystem.createTaskDao(c).update(task.markProcessing()); + } + }); + if (count == 1) { log.info("Executing task {}", task.id()); + } else { + log.trace("Missed task {}", task.id()); + } - List newTasks = effect.apply(task); + final List newTasks = effect.apply(task); - Date now = new Date(); + final Date now = new Date(); - log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size()); + log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size()); - task = task.markOk(now); + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + for (Task newTask : newTasks) { + schedule(c, newTask); + } - for (Task newTask : newTasks) { - schedule(c, newTask); + queueSystem.createTaskDao(c).update(task.markOk(now)); } + }); - taskDao.update(task); - } - } catch (final Exception e) { - if (task == null) { - return; - } - + return true; + } catch (Exception e) { final Date now = new Date(); log.error("Unable to execute task, id=" + task.id(), e); - final Task t = task; sqlEffectExecutor.transaction(new SqlEffect.Void() { @Override public void doInConnection(Connection c) throws SQLException { TaskDao taskDao = queueSystem.createTaskDao(c); - Task task = t.markFailed(now); - taskDao.update(task); + taskDao.update(task.markFailed(now)); } }); - if(e instanceof SQLException) { + if (e instanceof SQLException) { throw ((SQLException) e); } - throw new RuntimeException("Error while executing task, id=" + task.id(), e); + return false; } } @@ -156,10 +145,4 @@ public class JdbcQueueService { return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments); } - - public static class TaskExecutionFailed extends Throwable { - public TaskExecutionFailed(Exception e) { - super(e); - } - } } diff --git a/src/main/java/io/trygvis/queue/QueueService.java b/src/main/java/io/trygvis/queue/QueueService.java index 9773766..ee74bf4 100644 --- a/src/main/java/io/trygvis/queue/QueueService.java +++ b/src/main/java/io/trygvis/queue/QueueService.java @@ -5,9 +5,18 @@ import java.util.Date; import java.util.List; public interface QueueService { - void consume(Queue queue, TaskEffect effect) throws SQLException; + void consume(Queue queue, TaskExecutionRequest req, TaskEffect effect) throws SQLException; Queue getQueue(String name, int interval, boolean autoCreate) throws SQLException; void schedule(Queue queue, Date scheduled, List arguments) throws SQLException; + + public static class TaskExecutionRequest { + public final boolean stopOnError; + // TODO: saveExceptions + + public TaskExecutionRequest(boolean stopOnError) { + this.stopOnError = stopOnError; + } + } } diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java index 8b58585..9adec8f 100644 --- a/src/main/java/io/trygvis/queue/TaskDao.java +++ b/src/main/java/io/trygvis/queue/TaskDao.java @@ -57,10 +57,11 @@ public class TaskDao { } } - public List findByNameAndCompletedIsNull(String name) throws SQLException { - try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND completed IS NULL")) { + public List findByQueueAndState(String queue, TaskState state) throws SQLException { + try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND state=?")) { int i = 1; - stmt.setString(i, name); + stmt.setString(i++, queue); + stmt.setString(i, state.name()); ResultSet rs = stmt.executeQuery(); List list = new ArrayList<>(); while (rs.next()) { @@ -70,7 +71,7 @@ public class TaskDao { } } - public void update(Task task) throws SQLException { + public int update(Task task) throws SQLException { try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=?, scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?")) { int i = 1; stmt.setString(i++, task.state.name()); @@ -79,19 +80,15 @@ public class TaskDao { stmt.setInt(i++, task.runCount); setTimestamp(stmt, i++, task.completed); stmt.setLong(i, task.id()); - stmt.executeUpdate(); + return stmt.executeUpdate(); } } - public void setState(List tasks, TaskState state) throws SQLException { - Long[] ids = new Long[tasks.size()]; - for (int i = 0, tasksSize = tasks.size(); i < tasksSize; i++) { - ids[i] = tasks.get(i).id(); - } - try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=? WHERE id = ANY (?)")) { + public void setState(Task task, TaskState state) throws SQLException { + try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=? WHERE id = ?")) { int i = 1; stmt.setString(i++, state.name()); - stmt.setObject(i, c.createArrayOf("bigint", ids)); + stmt.setLong(i, task.id()); stmt.executeUpdate(); } } diff --git a/src/main/java/io/trygvis/spring/SpringQueueService.java b/src/main/java/io/trygvis/spring/SpringQueueService.java index 21746e5..271e9bf 100644 --- a/src/main/java/io/trygvis/spring/SpringQueueService.java +++ b/src/main/java/io/trygvis/spring/SpringQueueService.java @@ -27,10 +27,10 @@ public class SpringQueueService implements QueueService { } /** - * @see JdbcQueueService#consumeAll(io.trygvis.queue.Queue, io.trygvis.queue.TaskEffect) + * @see JdbcQueueService#consumeAll(io.trygvis.queue.Queue, io.trygvis.queue.QueueService.TaskExecutionRequest, io.trygvis.queue.TaskEffect) */ - public void consume(final Queue queue, final TaskEffect effect) throws SQLException { - queueService.consumeAll(queue, effect); + public void consume(final Queue queue, TaskExecutionRequest req, final TaskEffect effect) throws SQLException { + queueService.consumeAll(queue, req, effect); } @Transactional diff --git a/src/test/java/io/trygvis/test/DbUtil.java b/src/test/java/io/trygvis/test/DbUtil.java index a2c41d3..46459b0 100644 --- a/src/test/java/io/trygvis/test/DbUtil.java +++ b/src/test/java/io/trygvis/test/DbUtil.java @@ -6,7 +6,10 @@ import org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy; import javax.sql.DataSource; import java.io.PrintWriter; +import java.sql.Connection; +import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Statement; import static java.lang.System.getProperty; @@ -48,4 +51,14 @@ public class DbUtil { public static DataSource springifyDataSource(DataSource ds) { return new TransactionAwareDataSourceProxy(new LazyConnectionDataSourceProxy(ds)); } + + public static int getPid(Connection c) throws SQLException { + int pid; + try (Statement statement = c.createStatement()) { + ResultSet rs = statement.executeQuery("SELECT pg_backend_pid()"); + rs.next(); + pid = rs.getInt(1); + } + return pid; + } } diff --git a/src/test/java/io/trygvis/test/PlainJavaExample.java b/src/test/java/io/trygvis/test/PlainJavaExample.java index 488ee35..788d8a0 100644 --- a/src/test/java/io/trygvis/test/PlainJavaExample.java +++ b/src/test/java/io/trygvis/test/PlainJavaExample.java @@ -4,6 +4,7 @@ import io.trygvis.async.SqlEffect; import io.trygvis.async.SqlEffectExecutor; import io.trygvis.queue.JdbcQueueService; import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueService; import io.trygvis.queue.QueueSystem; import io.trygvis.queue.Task; import io.trygvis.queue.TaskEffect; @@ -51,7 +52,9 @@ public class PlainJavaExample { final Queue input = queues[0]; final Queue output = queues[1]; - queueService.consumeAll(input, new TaskEffect() { + QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(false); + + queueService.consumeAll(input, req, new TaskEffect() { public List apply(Task task) throws Exception { System.out.println("PlainJavaExample$Consumer.consumeAll: arguments = " + task.arguments); Long a = Long.valueOf(task.arguments.get(0)); diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml index a9e4a25..65b37a6 100755 --- a/src/test/resources/logback.xml +++ b/src/test/resources/logback.xml @@ -13,7 +13,7 @@ - + -- cgit v1.2.3