From 7d704feb86c44fca57941d223e8605b55fcf68f0 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Wed, 29 May 2013 22:16:50 +0200 Subject: o Splitting out the parts that implement the "async" features vs the "queue" features. --- .../java/io/trygvis/queue/JdbcAsyncService.java | 149 --------------------- 1 file changed, 149 deletions(-) delete mode 100644 src/main/java/io/trygvis/queue/JdbcAsyncService.java (limited to 'src/main/java/io/trygvis/queue/JdbcAsyncService.java') diff --git a/src/main/java/io/trygvis/queue/JdbcAsyncService.java b/src/main/java/io/trygvis/queue/JdbcAsyncService.java deleted file mode 100644 index 276541f..0000000 --- a/src/main/java/io/trygvis/queue/JdbcAsyncService.java +++ /dev/null @@ -1,149 +0,0 @@ -package io.trygvis.queue; - -import org.quartz.SchedulerException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Transactional; -import org.springframework.transaction.support.TransactionSynchronization; -import org.springframework.transaction.support.TransactionSynchronizationAdapter; -import org.springframework.transaction.support.TransactionTemplate; - -import java.util.Date; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledThreadPoolExecutor; - -import static java.lang.System.currentTimeMillis; -import static java.lang.Thread.sleep; -import static java.util.Arrays.asList; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.springframework.transaction.annotation.Propagation.REQUIRED; -import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization; - -@Component -public class JdbcAsyncService implements AsyncService { - private final Logger log = LoggerFactory.getLogger(getClass()); - - private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); - - private final Map queues = new HashMap<>(); - - @Autowired - private TransactionTemplate transactionTemplate; - - @Autowired - private QueueDao queueDao; - - @Autowired - private TaskDao taskDao; - - @Transactional(propagation = REQUIRED) - public Queue registerQueue(final String name, final int interval, AsyncCallable callable) throws SchedulerException { - log.info("registerQueue: ENTER"); - - Queue q = queueDao.findByName(name); - - log.info("q = {}", q); - - final long interval_; - if (q == null) { - q = new Queue(name, interval); - queueDao.insert(q); - interval_ = interval; - } else { - // Found an existing queue. Use the Settings from the database. - interval_ = q.interval; - } - - final QueueThread queueThread = new QueueThread(q, taskDao, transactionTemplate, callable); - queues.put(name, queueThread); - - registerSynchronization(new TransactionSynchronizationAdapter() { - public void afterCompletion(int status) { - log.info("Transaction completed with status = {}", status); - if (status == TransactionSynchronization.STATUS_COMMITTED) { - log.info("Starting thread for queue {} with poll interval = {}s", name, interval); - executor.scheduleAtFixedRate(new Runnable() { - public void run() { - queueThread.ping(); - } - }, 10, interval_, SECONDS); - Thread thread = new Thread(queueThread, name); - thread.setDaemon(true); - thread.start(); - } - } - }); - - log.info("registerQueue: LEAVE"); - return q; - } - - public Queue getQueue(String name) { - QueueThread queueThread = queues.get(name); - - if (queueThread == null) { - throw new RuntimeException("No such queue: '" + name + "'."); - } - - return queueThread.queue; - } - - @Transactional(propagation = REQUIRED) - public Task schedule(final Queue queue, String... args) { - return scheduleInner(null, queue, args); - } - - public Task schedule(long parent, Queue queue, String... args) { - return scheduleInner(parent, queue, args); - } - - private Task scheduleInner(Long parent, final Queue queue, String... args) { - Date scheduled = new Date(); - - StringBuilder arguments = new StringBuilder(); - for (String arg : args) { - arguments.append(arg).append(' '); - } - - long id = taskDao.insert(parent, queue.name, scheduled, arguments.toString()); - Task task = new Task(parent, id, queue.name, scheduled, null, 0, null, asList(args)); - log.info("Created task = {}", task); - - registerSynchronization(new TransactionSynchronizationAdapter() { - public void afterCompletion(int status) { - if (status == TransactionSynchronization.STATUS_COMMITTED) { - queues.get(queue.name).ping(); - } - } - }); - - return task; - } - - @Transactional - public Task await(Task task, long timeout) { - final long start = currentTimeMillis(); - final long end = start + timeout; - - while (currentTimeMillis() < end) { - task = update(task); - - try { - sleep(100); - } catch (InterruptedException e) { - // break - } - } - - return task; - } - - @Transactional(readOnly = true) - public Task update(Task ref) { - return taskDao.findById(ref.id); - } -} -- cgit v1.2.3