From 637dddf11f5d60b35c9696914e1e2658b2ddc611 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sat, 20 Apr 2013 15:43:01 +0200 Subject: wip --- .../java/io/trygvis/queue/JdbcAsyncService.java | 194 +++++++++++++++++++++ 1 file changed, 194 insertions(+) create mode 100644 src/main/java/io/trygvis/queue/JdbcAsyncService.java (limited to 'src/main/java/io/trygvis/queue/JdbcAsyncService.java') diff --git a/src/main/java/io/trygvis/queue/JdbcAsyncService.java b/src/main/java/io/trygvis/queue/JdbcAsyncService.java new file mode 100644 index 0000000..1df0ab6 --- /dev/null +++ b/src/main/java/io/trygvis/queue/JdbcAsyncService.java @@ -0,0 +1,194 @@ +package io.trygvis.queue; + +import org.quartz.*; +import org.slf4j.*; +import org.springframework.beans.factory.annotation.*; +import org.springframework.stereotype.*; +import org.springframework.transaction.*; +import org.springframework.transaction.annotation.*; +import org.springframework.transaction.support.*; + +import java.util.*; +import java.util.concurrent.*; + +import static java.util.Arrays.*; +import static java.util.concurrent.TimeUnit.*; +import static org.springframework.transaction.annotation.Propagation.*; +import static org.springframework.transaction.support.TransactionSynchronizationManager.*; + +@Component +public class JdbcAsyncService implements AsyncService { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); + + private final Map queues = new HashMap<>(); + + @Autowired + private TransactionTemplate transactionTemplate; + + @Autowired + private QueueDao queueDao; + + @Autowired + private TaskDao taskDao; + + @Transactional(propagation = REQUIRED) + public Queue registerQueue(final String name, final int interval, AsyncCallable callable) throws SchedulerException { + log.info("registerQueue: ENTER"); + + Queue q = queueDao.findByName(name); + + log.info("q = {}", q); + + final long interval_; + if (q == null) { + q = new Queue(name, interval * 1000); + queueDao.insert(q); + interval_ = interval; + } else { + // Found an existing queue. Use the Settings from the database. + interval_ = q.interval; + } + + final QueueThread queueThread = new QueueThread(q, callable); + queues.put(name, queueThread); + + registerSynchronization(new TransactionSynchronizationAdapter() { + public void afterCompletion(int status) { + log.info("status = {}", status); + if (status == TransactionSynchronization.STATUS_COMMITTED) { + executor.scheduleAtFixedRate(new Runnable() { + public void run() { + queueThread.ping(); + } + }, 1000, 1000 * interval_, MILLISECONDS); + Thread thread = new Thread(queueThread, name); + thread.setDaemon(true); + thread.start(); + } + } + }); + + log.info("registerQueue: LEAVE"); + return q; + } + + public Queue getQueue(String name) { + QueueThread queueThread = queues.get(name); + + if (queueThread == null) { + throw new RuntimeException("No such queue: '" + name + "'."); + } + + return queueThread.queue; + } + + @Transactional(propagation = REQUIRED) + public Task schedule(Queue queue, String... args) { + log.info("schedule: ENTER"); + + Date scheduled = new Date(); + + StringBuilder arguments = new StringBuilder(); + for (String arg : args) { + arguments.append(arg).append(' '); + } + + long id = taskDao.insert(queue.name, scheduled, arguments.toString()); + Task task = new Task(id, queue.name, scheduled, null, 0, null, asList(args)); + log.info("task = {}", task); + queues.get(queue.name).ping(); + try { + Thread.sleep(500); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + log.info("schedule: LEAVE"); + return task; + } + + @Transactional(readOnly = true) + public Task update(Task ref) { + return taskDao.findById(ref.id); + } + + class QueueThread implements Runnable { + public boolean shouldRun = true; + + public final Queue queue; + + private final AsyncCallable callable; + + QueueThread(Queue queue, AsyncCallable callable) { + this.queue = queue; + this.callable = callable; + } + + public void ping() { + log.info("Sending ping to " + queue); + synchronized (this) { + notify(); + } + } + + public void run() { + while (shouldRun) { + List tasks = taskDao.findByNameAndCompletedIsNull(queue.name); + + log.info("Found {} tasks on queue {}", tasks.size(), queue.name); + + try { + for (final Task task : tasks) { + try { + executeTask(task); + } catch (TransactionException | TaskFailureException e) { + log.warn("Task execution failed", e); + } + } + } catch (Exception e) { + log.warn("Error while executing tasks.", e); + } + + synchronized (this) { + try { + wait(); + } catch (InterruptedException e) { + // ignore + } + } + } + } + + private void executeTask(final Task task) { + final Date run = new Date(); + log.info("Setting last run on task. date = {}, task = {}", run, task); + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + protected void doInTransactionWithoutResult(TransactionStatus status) { + taskDao.update(task.registerRun()); + } + }); + + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + protected void doInTransactionWithoutResult(TransactionStatus status) { + try { + callable.run(); + Date completed = new Date(); + Task t = task.registerComplete(completed); + log.info("Completed task: {}", t); + taskDao.update(t); + } catch (Exception e) { + throw new TaskFailureException(e); + } + } + }); + } + } + + private static class TaskFailureException extends RuntimeException { + public TaskFailureException(Exception e) { + super(e); + } + } +} -- cgit v1.2.3