aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/async
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2013-06-09 23:51:39 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2013-06-09 23:51:39 +0200
commitabb0b2aaf4ee5e6f147987401c9b059e5a7679d2 (patch)
treebf8aeed0f29a2b81896cd48df399e133b7ebb4e0 /src/main/java/io/trygvis/async
parent1c0fc92c719f3856653d0efcc5fe4a1fa30b7bac (diff)
downloadquartz-based-queue-abb0b2aaf4ee5e6f147987401c9b059e5a7679d2.tar.gz
quartz-based-queue-abb0b2aaf4ee5e6f147987401c9b059e5a7679d2.tar.bz2
quartz-based-queue-abb0b2aaf4ee5e6f147987401c9b059e5a7679d2.tar.xz
quartz-based-queue-abb0b2aaf4ee5e6f147987401c9b059e5a7679d2.zip
wip
Diffstat (limited to 'src/main/java/io/trygvis/async')
-rw-r--r--src/main/java/io/trygvis/async/QueueThread.java7
-rw-r--r--src/main/java/io/trygvis/async/SqlEffectExecutor.java18
2 files changed, 9 insertions, 16 deletions
diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java
index 33753a3..558e769 100644
--- a/src/main/java/io/trygvis/async/QueueThread.java
+++ b/src/main/java/io/trygvis/async/QueueThread.java
@@ -12,6 +12,9 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
+import static io.trygvis.queue.QueueService.TaskExecutionRequest;
+import static io.trygvis.queue.Task.TaskState.NEW;
+
class QueueThread implements Runnable {
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -56,14 +59,14 @@ class QueueThread implements Runnable {
List<Task> tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() {
@Override
public List<Task> doInConnection(Connection c) throws SQLException {
- return queueSystem.createTaskDao(c).findByNameAndCompletedIsNull(queue.name);
+ return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW);
}
});
log.info("Found {} tasks on queue {}", tasks.size(), queue.name);
if (tasks.size() > 0) {
- queueService.executeTask(taskEffect, tasks);
+ queueService.executeTask(new TaskExecutionRequest(true), taskEffect, tasks);
}
} catch (Throwable e) {
log.warn("Error while executing tasks.", e);
diff --git a/src/main/java/io/trygvis/async/SqlEffectExecutor.java b/src/main/java/io/trygvis/async/SqlEffectExecutor.java
index 51ad31d..3da2cd3 100644
--- a/src/main/java/io/trygvis/async/SqlEffectExecutor.java
+++ b/src/main/java/io/trygvis/async/SqlEffectExecutor.java
@@ -15,11 +15,11 @@ public class SqlEffectExecutor {
}
public <A> A transaction(SqlEffect<A> effect) throws SQLException {
- int pid;
+// int pid;
try (Connection c = dataSource.getConnection()) {
- pid = getPid(c);
- System.out.println("pid = " + pid);
+// pid = getPid(c);
+// System.out.println("pid = " + pid);
boolean ok = false;
try {
@@ -28,7 +28,7 @@ public class SqlEffectExecutor {
ok = true;
return a;
} finally {
- System.out.println("Closing, pid = " + pid);
+// System.out.println("Closing, pid = " + pid);
if (!ok) {
try {
c.rollback();
@@ -49,14 +49,4 @@ public class SqlEffectExecutor {
}
});
}
-
- private int getPid(Connection c) throws SQLException {
- int pid;
- try (Statement statement = c.createStatement()) {
- ResultSet rs = statement.executeQuery("SELECT pg_backend_pid()");
- rs.next();
- pid = rs.getInt(1);
- }
- return pid;
- }
}