aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/async/JdbcAsyncService.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/async/JdbcAsyncService.java')
-rw-r--r--src/main/java/io/trygvis/async/JdbcAsyncService.java109
1 files changed, 42 insertions, 67 deletions
diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java
index 4e78a37..c34330e 100644
--- a/src/main/java/io/trygvis/async/JdbcAsyncService.java
+++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java
@@ -4,88 +4,66 @@ import io.trygvis.queue.Queue;
import io.trygvis.queue.QueueDao;
import io.trygvis.queue.Task;
import io.trygvis.queue.TaskDao;
-import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-import org.springframework.transaction.annotation.Transactional;
-import org.springframework.transaction.support.TransactionSynchronization;
-import org.springframework.transaction.support.TransactionSynchronizationAdapter;
-import org.springframework.transaction.support.TransactionTemplate;
+import java.sql.Connection;
+import java.sql.SQLException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.sleep;
import static java.util.Arrays.asList;
import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.springframework.transaction.annotation.Propagation.REQUIRED;
-import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization;
-@Component
-public class JdbcAsyncService implements AsyncService {
+public class JdbcAsyncService {
private final Logger log = LoggerFactory.getLogger(getClass());
- private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory());
-
private final Map<String, QueueThread> queues = new HashMap<>();
- @Autowired
- private TransactionTemplate transactionTemplate;
-
- @Autowired
- private QueueDao queueDao;
-
- @Autowired
- private TaskDao taskDao;
+ public Queue registerQueue(Connection c, SqlEffectExecutor sqlEffectExecutor, final String name, final int interval, AsyncService.AsyncCallable callable) throws SQLException {
+ QueueDao queueDao = new QueueDao(c);
- @Transactional(propagation = REQUIRED)
- public Queue registerQueue(final String name, final int interval, AsyncCallable callable) throws SchedulerException {
log.info("registerQueue: ENTER");
Queue q = queueDao.findByName(name);
log.info("q = {}", q);
- final long interval_;
if (q == null) {
q = new Queue(name, interval);
queueDao.insert(q);
- interval_ = interval;
- } else {
- // Found an existing queue. Use the Settings from the database.
- interval_ = q.interval;
}
- final QueueThread queueThread = new QueueThread(q, taskDao, transactionTemplate, callable);
+ final QueueThread queueThread = new QueueThread(sqlEffectExecutor, callable, q);
queues.put(name, queueThread);
- registerSynchronization(new TransactionSynchronizationAdapter() {
- public void afterCompletion(int status) {
- log.info("Transaction completed with status = {}", status);
- if (status == TransactionSynchronization.STATUS_COMMITTED) {
- log.info("Starting thread for queue {} with poll interval = {}s", name, interval);
- executor.scheduleAtFixedRate(new Runnable() {
- public void run() {
- queueThread.ping();
- }
- }, 10, interval_, SECONDS);
- Thread thread = new Thread(queueThread, name);
- thread.setDaemon(true);
- thread.start();
- }
- }
- });
-
log.info("registerQueue: LEAVE");
return q;
}
+ public void startQueue(ScheduledThreadPoolExecutor executor, String name) {
+ final QueueThread queueThread = queues.get(name);
+
+ if (queueThread == null) {
+ throw new RuntimeException("No such queue: " + name);
+ }
+
+ long interval = queueThread.queue.interval;
+ log.info("Starting thread for queue {} with poll interval = {}s", name, interval);
+ executor.scheduleAtFixedRate(new Runnable() {
+ public void run() {
+ queueThread.ping();
+ }
+ }, 10, interval, SECONDS);
+ Thread thread = new Thread(queueThread, name);
+ thread.setDaemon(true);
+ thread.start();
+ }
+
public Queue getQueue(String name) {
QueueThread queueThread = queues.get(name);
@@ -96,16 +74,17 @@ public class JdbcAsyncService implements AsyncService {
return queueThread.queue;
}
- @Transactional(propagation = REQUIRED)
- public Task schedule(final Queue queue, String... args) {
- return scheduleInner(null, queue, args);
+ public Task schedule(Connection c, final Queue queue, String... args) throws SQLException {
+ return scheduleInner(c, null, queue, args);
}
- public Task schedule(long parent, Queue queue, String... args) {
- return scheduleInner(parent, queue, args);
+ public Task schedule(Connection c, long parent, Queue queue, String... args) throws SQLException {
+ return scheduleInner(c, parent, queue, args);
}
- private Task scheduleInner(Long parent, final Queue queue, String... args) {
+ private Task scheduleInner(Connection c, Long parent, final Queue queue, String... args) throws SQLException {
+ TaskDao taskDao = new TaskDao(c);
+
Date scheduled = new Date();
StringBuilder arguments = new StringBuilder();
@@ -114,27 +93,22 @@ public class JdbcAsyncService implements AsyncService {
}
long id = taskDao.insert(parent, queue.name, scheduled, arguments.toString());
- Task task = new Task(parent, id, queue.name, scheduled, null, 0, null, asList(args));
+ Task task = new Task(id, parent, queue.name, scheduled, null, 0, null, asList(args));
log.info("Created task = {}", task);
- registerSynchronization(new TransactionSynchronizationAdapter() {
- public void afterCompletion(int status) {
- if (status == TransactionSynchronization.STATUS_COMMITTED) {
- queues.get(queue.name).ping();
- }
- }
- });
-
return task;
}
- @Transactional
- public Task await(Task task, long timeout) {
+ public Task await(Connection c, Task task, long timeout) throws SQLException {
final long start = currentTimeMillis();
final long end = start + timeout;
while (currentTimeMillis() < end) {
- task = update(task);
+ task = update(c, task);
+
+ if (task == null) {
+ throw new RuntimeException("The task went away.");
+ }
try {
sleep(100);
@@ -146,8 +120,9 @@ public class JdbcAsyncService implements AsyncService {
return task;
}
- @Transactional(readOnly = true)
- public Task update(Task ref) {
+ public Task update(Connection c, Task ref) throws SQLException {
+ TaskDao taskDao = new TaskDao(c);
+
return taskDao.findById(ref.id);
}
}