aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/async/JdbcAsyncService.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/async/JdbcAsyncService.java')
-rw-r--r--src/main/java/io/trygvis/async/JdbcAsyncService.java39
1 files changed, 10 insertions, 29 deletions
diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java
index c34330e..310c59b 100644
--- a/src/main/java/io/trygvis/async/JdbcAsyncService.java
+++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java
@@ -1,7 +1,6 @@
package io.trygvis.async;
import io.trygvis.queue.Queue;
-import io.trygvis.queue.QueueDao;
import io.trygvis.queue.Task;
import io.trygvis.queue.TaskDao;
import org.slf4j.Logger;
@@ -11,12 +10,12 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.util.Date;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.sleep;
-import static java.util.Arrays.asList;
import static java.util.concurrent.TimeUnit.SECONDS;
public class JdbcAsyncService {
@@ -24,25 +23,12 @@ public class JdbcAsyncService {
private final Map<String, QueueThread> queues = new HashMap<>();
- public Queue registerQueue(Connection c, SqlEffectExecutor sqlEffectExecutor, final String name, final int interval, AsyncService.AsyncCallable callable) throws SQLException {
- QueueDao queueDao = new QueueDao(c);
+ public void registerQueue(SqlEffectExecutor sqlEffectExecutor, Queue queue, AsyncService.AsyncCallable callable) {
+ final QueueThread queueThread = new QueueThread(sqlEffectExecutor, callable, queue);
- log.info("registerQueue: ENTER");
-
- Queue q = queueDao.findByName(name);
-
- log.info("q = {}", q);
-
- if (q == null) {
- q = new Queue(name, interval);
- queueDao.insert(q);
- }
-
- final QueueThread queueThread = new QueueThread(sqlEffectExecutor, callable, q);
- queues.put(name, queueThread);
+ queues.put(queue.name, queueThread);
log.info("registerQueue: LEAVE");
- return q;
}
public void startQueue(ScheduledThreadPoolExecutor executor, String name) {
@@ -74,26 +60,21 @@ public class JdbcAsyncService {
return queueThread.queue;
}
- public Task schedule(Connection c, final Queue queue, String... args) throws SQLException {
+ public Task schedule(Connection c, final Queue queue, List<String> args) throws SQLException {
return scheduleInner(c, null, queue, args);
}
- public Task schedule(Connection c, long parent, Queue queue, String... args) throws SQLException {
+ public Task schedule(Connection c, long parent, Queue queue, List<String> args) throws SQLException {
return scheduleInner(c, parent, queue, args);
}
- private Task scheduleInner(Connection c, Long parent, final Queue queue, String... args) throws SQLException {
+ private Task scheduleInner(Connection c, Long parent, final Queue queue, List<String> args) throws SQLException {
TaskDao taskDao = new TaskDao(c);
Date scheduled = new Date();
- StringBuilder arguments = new StringBuilder();
- for (String arg : args) {
- arguments.append(arg).append(' ');
- }
-
- long id = taskDao.insert(parent, queue.name, scheduled, arguments.toString());
- Task task = new Task(id, parent, queue.name, scheduled, null, 0, null, asList(args));
+ long id = taskDao.insert(parent, queue.name, scheduled, args);
+ Task task = new Task(id, parent, queue.name, scheduled, null, 0, null, args);
log.info("Created task = {}", task);
return task;
@@ -123,6 +104,6 @@ public class JdbcAsyncService {
public Task update(Connection c, Task ref) throws SQLException {
TaskDao taskDao = new TaskDao(c);
- return taskDao.findById(ref.id);
+ return taskDao.findById(ref.id());
}
}