aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2013-06-09 23:51:39 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2013-06-09 23:51:39 +0200
commitabb0b2aaf4ee5e6f147987401c9b059e5a7679d2 (patch)
treebf8aeed0f29a2b81896cd48df399e133b7ebb4e0
parent1c0fc92c719f3856653d0efcc5fe4a1fa30b7bac (diff)
downloadquartz-based-queue-abb0b2aaf4ee5e6f147987401c9b059e5a7679d2.tar.gz
quartz-based-queue-abb0b2aaf4ee5e6f147987401c9b059e5a7679d2.tar.bz2
quartz-based-queue-abb0b2aaf4ee5e6f147987401c9b059e5a7679d2.tar.xz
quartz-based-queue-abb0b2aaf4ee5e6f147987401c9b059e5a7679d2.zip
wip
-rw-r--r--src/main/java/io/trygvis/async/QueueThread.java7
-rw-r--r--src/main/java/io/trygvis/async/SqlEffectExecutor.java18
-rw-r--r--src/main/java/io/trygvis/queue/JdbcQueueService.java105
-rw-r--r--src/main/java/io/trygvis/queue/QueueService.java11
-rw-r--r--src/main/java/io/trygvis/queue/TaskDao.java21
-rw-r--r--src/main/java/io/trygvis/spring/SpringQueueService.java6
-rw-r--r--src/test/java/io/trygvis/test/DbUtil.java13
-rw-r--r--src/test/java/io/trygvis/test/PlainJavaExample.java5
-rwxr-xr-xsrc/test/resources/logback.xml2
9 files changed, 93 insertions, 95 deletions
diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java
index 33753a3..558e769 100644
--- a/src/main/java/io/trygvis/async/QueueThread.java
+++ b/src/main/java/io/trygvis/async/QueueThread.java
@@ -12,6 +12,9 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
+import static io.trygvis.queue.QueueService.TaskExecutionRequest;
+import static io.trygvis.queue.Task.TaskState.NEW;
+
class QueueThread implements Runnable {
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -56,14 +59,14 @@ class QueueThread implements Runnable {
List<Task> tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() {
@Override
public List<Task> doInConnection(Connection c) throws SQLException {
- return queueSystem.createTaskDao(c).findByNameAndCompletedIsNull(queue.name);
+ return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW);
}
});
log.info("Found {} tasks on queue {}", tasks.size(), queue.name);
if (tasks.size() > 0) {
- queueService.executeTask(taskEffect, tasks);
+ queueService.executeTask(new TaskExecutionRequest(true), taskEffect, tasks);
}
} catch (Throwable e) {
log.warn("Error while executing tasks.", e);
diff --git a/src/main/java/io/trygvis/async/SqlEffectExecutor.java b/src/main/java/io/trygvis/async/SqlEffectExecutor.java
index 51ad31d..3da2cd3 100644
--- a/src/main/java/io/trygvis/async/SqlEffectExecutor.java
+++ b/src/main/java/io/trygvis/async/SqlEffectExecutor.java
@@ -15,11 +15,11 @@ public class SqlEffectExecutor {
}
public <A> A transaction(SqlEffect<A> effect) throws SQLException {
- int pid;
+// int pid;
try (Connection c = dataSource.getConnection()) {
- pid = getPid(c);
- System.out.println("pid = " + pid);
+// pid = getPid(c);
+// System.out.println("pid = " + pid);
boolean ok = false;
try {
@@ -28,7 +28,7 @@ public class SqlEffectExecutor {
ok = true;
return a;
} finally {
- System.out.println("Closing, pid = " + pid);
+// System.out.println("Closing, pid = " + pid);
if (!ok) {
try {
c.rollback();
@@ -49,14 +49,4 @@ public class SqlEffectExecutor {
}
});
}
-
- private int getPid(Connection c) throws SQLException {
- int pid;
- try (Statement statement = c.createStatement()) {
- ResultSet rs = statement.executeQuery("SELECT pg_backend_pid()");
- rs.next();
- pid = rs.getInt(1);
- }
- return pid;
- }
}
diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java
index c99bf2e..edd6c80 100644
--- a/src/main/java/io/trygvis/queue/JdbcQueueService.java
+++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java
@@ -10,6 +10,7 @@ import java.sql.SQLException;
import java.util.Date;
import java.util.List;
+import static io.trygvis.queue.QueueService.TaskExecutionRequest;
import static io.trygvis.queue.Task.TaskState.NEW;
import static io.trygvis.queue.Task.TaskState.PROCESSING;
@@ -26,97 +27,85 @@ public class JdbcQueueService {
this.sqlEffectExecutor = queueSystem.sqlEffectExecutor;
}
- public void consumeAll(final Queue queue, final TaskEffect effect) throws SQLException {
+ public void consumeAll(final Queue queue, TaskExecutionRequest req, final TaskEffect effect) throws SQLException {
final List<Task> tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() {
@Override
public List<Task> doInConnection(Connection c) throws SQLException {
- TaskDao taskDao = queueSystem.createTaskDao(c);
-
- List<Task> tasks = taskDao.findByNameAndCompletedIsNull(queue.name);
- log.trace("Got {} tasks.", tasks.size());
- taskDao.setState(tasks, PROCESSING);
- return tasks;
+ return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW);
}
});
- sqlEffectExecutor.transaction(new SqlEffect.Void() {
- @Override
- public void doInConnection(Connection c) throws SQLException {
- applyTasks(c, effect, queueSystem.createTaskDao(c), tasks);
- }
- });
+ applyTasks(req, effect, tasks);
}
- public void executeTask(final TaskEffect taskEffect, final List<Task> tasks) throws SQLException {
- sqlEffectExecutor.transaction(new SqlEffect.Void() {
- @Override
- public void doInConnection(Connection connection) throws SQLException {
- for (Task task : tasks) {
- final Date run = new Date();
- log.info("Setting last run on task. date = {}, task = {}", run, task);
- new TaskDao(connection).update(task.markProcessing());
- }
- }
- });
-
- sqlEffectExecutor.transaction(new SqlEffect.Void() {
- @Override
- public void doInConnection(Connection c) throws SQLException {
- TaskDao taskDao = new TaskDao(c);
-
- applyTasks(c, taskEffect, taskDao, tasks);
- }
- });
+ public void executeTask(TaskExecutionRequest req, TaskEffect taskEffect, List<Task> tasks) throws SQLException {
+ applyTasks(req, taskEffect, tasks);
}
/**
* Tries to execute all the tasks on the connection. If it fails, it will execute an SQL effect.
*/
- private void applyTasks(Connection c, TaskEffect effect, final TaskDao taskDao, List<Task> tasks) throws SQLException {
- Task task = null;
+ private void applyTasks(TaskExecutionRequest req, TaskEffect effect, List<Task> tasks) throws SQLException {
+ for (Task task : tasks) {
+ boolean ok = applyTask(effect, task);
+
+ if (!ok && req.stopOnError) {
+ throw new RuntimeException("Error while executing task, id=" + task.id());
+ }
+ }
+ }
+
+ private boolean applyTask(TaskEffect effect, final Task task) throws SQLException {
try {
- for (int i = 0; i < tasks.size(); i++) {
- task = tasks.get(i);
+ final Date run = new Date();
+ Integer count = sqlEffectExecutor.transaction(new SqlEffect<Integer>() {
+ @Override
+ public Integer doInConnection(Connection c) throws SQLException {
+ return queueSystem.createTaskDao(c).update(task.markProcessing());
+ }
+ });
+ if (count == 1) {
log.info("Executing task {}", task.id());
+ } else {
+ log.trace("Missed task {}", task.id());
+ }
- List<Task> newTasks = effect.apply(task);
+ final List<Task> newTasks = effect.apply(task);
- Date now = new Date();
+ final Date now = new Date();
- log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size());
+ log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size());
- task = task.markOk(now);
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ for (Task newTask : newTasks) {
+ schedule(c, newTask);
+ }
- for (Task newTask : newTasks) {
- schedule(c, newTask);
+ queueSystem.createTaskDao(c).update(task.markOk(now));
}
+ });
- taskDao.update(task);
- }
- } catch (final Exception e) {
- if (task == null) {
- return;
- }
-
+ return true;
+ } catch (Exception e) {
final Date now = new Date();
log.error("Unable to execute task, id=" + task.id(), e);
- final Task t = task;
sqlEffectExecutor.transaction(new SqlEffect.Void() {
@Override
public void doInConnection(Connection c) throws SQLException {
TaskDao taskDao = queueSystem.createTaskDao(c);
- Task task = t.markFailed(now);
- taskDao.update(task);
+ taskDao.update(task.markFailed(now));
}
});
- if(e instanceof SQLException) {
+ if (e instanceof SQLException) {
throw ((SQLException) e);
}
- throw new RuntimeException("Error while executing task, id=" + task.id(), e);
+ return false;
}
}
@@ -156,10 +145,4 @@ public class JdbcQueueService {
return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments);
}
-
- public static class TaskExecutionFailed extends Throwable {
- public TaskExecutionFailed(Exception e) {
- super(e);
- }
- }
}
diff --git a/src/main/java/io/trygvis/queue/QueueService.java b/src/main/java/io/trygvis/queue/QueueService.java
index 9773766..ee74bf4 100644
--- a/src/main/java/io/trygvis/queue/QueueService.java
+++ b/src/main/java/io/trygvis/queue/QueueService.java
@@ -5,9 +5,18 @@ import java.util.Date;
import java.util.List;
public interface QueueService {
- void consume(Queue queue, TaskEffect effect) throws SQLException;
+ void consume(Queue queue, TaskExecutionRequest req, TaskEffect effect) throws SQLException;
Queue getQueue(String name, int interval, boolean autoCreate) throws SQLException;
void schedule(Queue queue, Date scheduled, List<String> arguments) throws SQLException;
+
+ public static class TaskExecutionRequest {
+ public final boolean stopOnError;
+ // TODO: saveExceptions
+
+ public TaskExecutionRequest(boolean stopOnError) {
+ this.stopOnError = stopOnError;
+ }
+ }
}
diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java
index 8b58585..9adec8f 100644
--- a/src/main/java/io/trygvis/queue/TaskDao.java
+++ b/src/main/java/io/trygvis/queue/TaskDao.java
@@ -57,10 +57,11 @@ public class TaskDao {
}
}
- public List<Task> findByNameAndCompletedIsNull(String name) throws SQLException {
- try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND completed IS NULL")) {
+ public List<Task> findByQueueAndState(String queue, TaskState state) throws SQLException {
+ try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND state=?")) {
int i = 1;
- stmt.setString(i, name);
+ stmt.setString(i++, queue);
+ stmt.setString(i, state.name());
ResultSet rs = stmt.executeQuery();
List<Task> list = new ArrayList<>();
while (rs.next()) {
@@ -70,7 +71,7 @@ public class TaskDao {
}
}
- public void update(Task task) throws SQLException {
+ public int update(Task task) throws SQLException {
try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=?, scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?")) {
int i = 1;
stmt.setString(i++, task.state.name());
@@ -79,19 +80,15 @@ public class TaskDao {
stmt.setInt(i++, task.runCount);
setTimestamp(stmt, i++, task.completed);
stmt.setLong(i, task.id());
- stmt.executeUpdate();
+ return stmt.executeUpdate();
}
}
- public void setState(List<Task> tasks, TaskState state) throws SQLException {
- Long[] ids = new Long[tasks.size()];
- for (int i = 0, tasksSize = tasks.size(); i < tasksSize; i++) {
- ids[i] = tasks.get(i).id();
- }
- try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=? WHERE id = ANY (?)")) {
+ public void setState(Task task, TaskState state) throws SQLException {
+ try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=? WHERE id = ?")) {
int i = 1;
stmt.setString(i++, state.name());
- stmt.setObject(i, c.createArrayOf("bigint", ids));
+ stmt.setLong(i, task.id());
stmt.executeUpdate();
}
}
diff --git a/src/main/java/io/trygvis/spring/SpringQueueService.java b/src/main/java/io/trygvis/spring/SpringQueueService.java
index 21746e5..271e9bf 100644
--- a/src/main/java/io/trygvis/spring/SpringQueueService.java
+++ b/src/main/java/io/trygvis/spring/SpringQueueService.java
@@ -27,10 +27,10 @@ public class SpringQueueService implements QueueService {
}
/**
- * @see JdbcQueueService#consumeAll(io.trygvis.queue.Queue, io.trygvis.queue.TaskEffect)
+ * @see JdbcQueueService#consumeAll(io.trygvis.queue.Queue, io.trygvis.queue.QueueService.TaskExecutionRequest, io.trygvis.queue.TaskEffect)
*/
- public void consume(final Queue queue, final TaskEffect effect) throws SQLException {
- queueService.consumeAll(queue, effect);
+ public void consume(final Queue queue, TaskExecutionRequest req, final TaskEffect effect) throws SQLException {
+ queueService.consumeAll(queue, req, effect);
}
@Transactional
diff --git a/src/test/java/io/trygvis/test/DbUtil.java b/src/test/java/io/trygvis/test/DbUtil.java
index a2c41d3..46459b0 100644
--- a/src/test/java/io/trygvis/test/DbUtil.java
+++ b/src/test/java/io/trygvis/test/DbUtil.java
@@ -6,7 +6,10 @@ import org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy;
import javax.sql.DataSource;
import java.io.PrintWriter;
+import java.sql.Connection;
+import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Statement;
import static java.lang.System.getProperty;
@@ -48,4 +51,14 @@ public class DbUtil {
public static DataSource springifyDataSource(DataSource ds) {
return new TransactionAwareDataSourceProxy(new LazyConnectionDataSourceProxy(ds));
}
+
+ public static int getPid(Connection c) throws SQLException {
+ int pid;
+ try (Statement statement = c.createStatement()) {
+ ResultSet rs = statement.executeQuery("SELECT pg_backend_pid()");
+ rs.next();
+ pid = rs.getInt(1);
+ }
+ return pid;
+ }
}
diff --git a/src/test/java/io/trygvis/test/PlainJavaExample.java b/src/test/java/io/trygvis/test/PlainJavaExample.java
index 488ee35..788d8a0 100644
--- a/src/test/java/io/trygvis/test/PlainJavaExample.java
+++ b/src/test/java/io/trygvis/test/PlainJavaExample.java
@@ -4,6 +4,7 @@ import io.trygvis.async.SqlEffect;
import io.trygvis.async.SqlEffectExecutor;
import io.trygvis.queue.JdbcQueueService;
import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueService;
import io.trygvis.queue.QueueSystem;
import io.trygvis.queue.Task;
import io.trygvis.queue.TaskEffect;
@@ -51,7 +52,9 @@ public class PlainJavaExample {
final Queue input = queues[0];
final Queue output = queues[1];
- queueService.consumeAll(input, new TaskEffect() {
+ QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(false);
+
+ queueService.consumeAll(input, req, new TaskEffect() {
public List<Task> apply(Task task) throws Exception {
System.out.println("PlainJavaExample$Consumer.consumeAll: arguments = " + task.arguments);
Long a = Long.valueOf(task.arguments.get(0));
diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml
index a9e4a25..65b37a6 100755
--- a/src/test/resources/logback.xml
+++ b/src/test/resources/logback.xml
@@ -13,7 +13,7 @@
<logger name="org.hibernate" level="INFO"/>
<logger name="org.hibernate.SQL" level="INFO"/>
- <logger name="com.jolbox" level="INFO"/>
+ <logger name="com.jolbox" level="DEBUG"/>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>