diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2013-06-09 23:51:39 +0200 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2013-06-09 23:51:39 +0200 |
commit | abb0b2aaf4ee5e6f147987401c9b059e5a7679d2 (patch) | |
tree | bf8aeed0f29a2b81896cd48df399e133b7ebb4e0 /src/main/java/io/trygvis/async | |
parent | 1c0fc92c719f3856653d0efcc5fe4a1fa30b7bac (diff) | |
download | quartz-based-queue-abb0b2aaf4ee5e6f147987401c9b059e5a7679d2.tar.gz quartz-based-queue-abb0b2aaf4ee5e6f147987401c9b059e5a7679d2.tar.bz2 quartz-based-queue-abb0b2aaf4ee5e6f147987401c9b059e5a7679d2.tar.xz quartz-based-queue-abb0b2aaf4ee5e6f147987401c9b059e5a7679d2.zip |
wip
Diffstat (limited to 'src/main/java/io/trygvis/async')
-rw-r--r-- | src/main/java/io/trygvis/async/QueueThread.java | 7 | ||||
-rw-r--r-- | src/main/java/io/trygvis/async/SqlEffectExecutor.java | 18 |
2 files changed, 9 insertions, 16 deletions
diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java index 33753a3..558e769 100644 --- a/src/main/java/io/trygvis/async/QueueThread.java +++ b/src/main/java/io/trygvis/async/QueueThread.java @@ -12,6 +12,9 @@ import java.sql.Connection; import java.sql.SQLException; import java.util.List; +import static io.trygvis.queue.QueueService.TaskExecutionRequest; +import static io.trygvis.queue.Task.TaskState.NEW; + class QueueThread implements Runnable { private final Logger log = LoggerFactory.getLogger(getClass()); @@ -56,14 +59,14 @@ class QueueThread implements Runnable { List<Task> tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() { @Override public List<Task> doInConnection(Connection c) throws SQLException { - return queueSystem.createTaskDao(c).findByNameAndCompletedIsNull(queue.name); + return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW); } }); log.info("Found {} tasks on queue {}", tasks.size(), queue.name); if (tasks.size() > 0) { - queueService.executeTask(taskEffect, tasks); + queueService.executeTask(new TaskExecutionRequest(true), taskEffect, tasks); } } catch (Throwable e) { log.warn("Error while executing tasks.", e); diff --git a/src/main/java/io/trygvis/async/SqlEffectExecutor.java b/src/main/java/io/trygvis/async/SqlEffectExecutor.java index 51ad31d..3da2cd3 100644 --- a/src/main/java/io/trygvis/async/SqlEffectExecutor.java +++ b/src/main/java/io/trygvis/async/SqlEffectExecutor.java @@ -15,11 +15,11 @@ public class SqlEffectExecutor { } public <A> A transaction(SqlEffect<A> effect) throws SQLException { - int pid; +// int pid; try (Connection c = dataSource.getConnection()) { - pid = getPid(c); - System.out.println("pid = " + pid); +// pid = getPid(c); +// System.out.println("pid = " + pid); boolean ok = false; try { @@ -28,7 +28,7 @@ public class SqlEffectExecutor { ok = true; return a; } finally { - System.out.println("Closing, pid = " + pid); +// System.out.println("Closing, pid = " + pid); if (!ok) { try { c.rollback(); @@ -49,14 +49,4 @@ public class SqlEffectExecutor { } }); } - - private int getPid(Connection c) throws SQLException { - int pid; - try (Statement statement = c.createStatement()) { - ResultSet rs = statement.executeQuery("SELECT pg_backend_pid()"); - rs.next(); - pid = rs.getInt(1); - } - return pid; - } } |