From 7d704feb86c44fca57941d223e8605b55fcf68f0 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Wed, 29 May 2013 22:16:50 +0200 Subject: o Splitting out the parts that implement the "async" features vs the "queue" features. --- .../java/io/trygvis/async/JdbcAsyncService.java | 153 +++++++++++++++++++++ 1 file changed, 153 insertions(+) create mode 100644 src/main/java/io/trygvis/async/JdbcAsyncService.java (limited to 'src/main/java/io/trygvis/async/JdbcAsyncService.java') diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java new file mode 100644 index 0000000..4e78a37 --- /dev/null +++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java @@ -0,0 +1,153 @@ +package io.trygvis.async; + +import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueDao; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskDao; +import org.quartz.SchedulerException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionSynchronization; +import org.springframework.transaction.support.TransactionSynchronizationAdapter; +import org.springframework.transaction.support.TransactionTemplate; + +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static java.lang.System.currentTimeMillis; +import static java.lang.Thread.sleep; +import static java.util.Arrays.asList; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.springframework.transaction.annotation.Propagation.REQUIRED; +import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization; + +@Component +public class JdbcAsyncService implements AsyncService { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); + + private final Map queues = new HashMap<>(); + + @Autowired + private TransactionTemplate transactionTemplate; + + @Autowired + private QueueDao queueDao; + + @Autowired + private TaskDao taskDao; + + @Transactional(propagation = REQUIRED) + public Queue registerQueue(final String name, final int interval, AsyncCallable callable) throws SchedulerException { + log.info("registerQueue: ENTER"); + + Queue q = queueDao.findByName(name); + + log.info("q = {}", q); + + final long interval_; + if (q == null) { + q = new Queue(name, interval); + queueDao.insert(q); + interval_ = interval; + } else { + // Found an existing queue. Use the Settings from the database. + interval_ = q.interval; + } + + final QueueThread queueThread = new QueueThread(q, taskDao, transactionTemplate, callable); + queues.put(name, queueThread); + + registerSynchronization(new TransactionSynchronizationAdapter() { + public void afterCompletion(int status) { + log.info("Transaction completed with status = {}", status); + if (status == TransactionSynchronization.STATUS_COMMITTED) { + log.info("Starting thread for queue {} with poll interval = {}s", name, interval); + executor.scheduleAtFixedRate(new Runnable() { + public void run() { + queueThread.ping(); + } + }, 10, interval_, SECONDS); + Thread thread = new Thread(queueThread, name); + thread.setDaemon(true); + thread.start(); + } + } + }); + + log.info("registerQueue: LEAVE"); + return q; + } + + public Queue getQueue(String name) { + QueueThread queueThread = queues.get(name); + + if (queueThread == null) { + throw new RuntimeException("No such queue: '" + name + "'."); + } + + return queueThread.queue; + } + + @Transactional(propagation = REQUIRED) + public Task schedule(final Queue queue, String... args) { + return scheduleInner(null, queue, args); + } + + public Task schedule(long parent, Queue queue, String... args) { + return scheduleInner(parent, queue, args); + } + + private Task scheduleInner(Long parent, final Queue queue, String... args) { + Date scheduled = new Date(); + + StringBuilder arguments = new StringBuilder(); + for (String arg : args) { + arguments.append(arg).append(' '); + } + + long id = taskDao.insert(parent, queue.name, scheduled, arguments.toString()); + Task task = new Task(parent, id, queue.name, scheduled, null, 0, null, asList(args)); + log.info("Created task = {}", task); + + registerSynchronization(new TransactionSynchronizationAdapter() { + public void afterCompletion(int status) { + if (status == TransactionSynchronization.STATUS_COMMITTED) { + queues.get(queue.name).ping(); + } + } + }); + + return task; + } + + @Transactional + public Task await(Task task, long timeout) { + final long start = currentTimeMillis(); + final long end = start + timeout; + + while (currentTimeMillis() < end) { + task = update(task); + + try { + sleep(100); + } catch (InterruptedException e) { + // break + } + } + + return task; + } + + @Transactional(readOnly = true) + public Task update(Task ref) { + return taskDao.findById(ref.id); + } +} -- cgit v1.2.3