From 7d704feb86c44fca57941d223e8605b55fcf68f0 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Wed, 29 May 2013 22:16:50 +0200 Subject: o Splitting out the parts that implement the "async" features vs the "queue" features. --- src/main/java/io/trygvis/queue/AsyncService.java | 32 ----- .../java/io/trygvis/queue/JdbcAsyncService.java | 149 --------------------- src/main/java/io/trygvis/queue/QueueThread.java | 116 ---------------- src/main/java/io/trygvis/queue/Task.java | 2 +- .../io/trygvis/queue/TaskFailureException.java | 7 - 5 files changed, 1 insertion(+), 305 deletions(-) delete mode 100755 src/main/java/io/trygvis/queue/AsyncService.java delete mode 100644 src/main/java/io/trygvis/queue/JdbcAsyncService.java delete mode 100644 src/main/java/io/trygvis/queue/QueueThread.java delete mode 100644 src/main/java/io/trygvis/queue/TaskFailureException.java (limited to 'src/main/java/io/trygvis/queue') diff --git a/src/main/java/io/trygvis/queue/AsyncService.java b/src/main/java/io/trygvis/queue/AsyncService.java deleted file mode 100755 index c9e5861..0000000 --- a/src/main/java/io/trygvis/queue/AsyncService.java +++ /dev/null @@ -1,32 +0,0 @@ -package io.trygvis.queue; - -import org.quartz.SchedulerException; - -import java.util.List; - -public interface AsyncService { - - /** - * @param name - * @param interval how often the queue should be polled for missed tasks in seconds. - * @param callable - * @return - * @throws SchedulerException - */ - Queue registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException; - - Queue getQueue(String name); - - Task schedule(Queue queue, String... args); - - Task schedule(long parent, Queue queue, String... args); - - /** - * Polls for a new state of the execution. - */ - Task update(Task ref); - - interface AsyncCallable { - void run(List arguments) throws Exception; - } -} diff --git a/src/main/java/io/trygvis/queue/JdbcAsyncService.java b/src/main/java/io/trygvis/queue/JdbcAsyncService.java deleted file mode 100644 index 276541f..0000000 --- a/src/main/java/io/trygvis/queue/JdbcAsyncService.java +++ /dev/null @@ -1,149 +0,0 @@ -package io.trygvis.queue; - -import org.quartz.SchedulerException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Transactional; -import org.springframework.transaction.support.TransactionSynchronization; -import org.springframework.transaction.support.TransactionSynchronizationAdapter; -import org.springframework.transaction.support.TransactionTemplate; - -import java.util.Date; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledThreadPoolExecutor; - -import static java.lang.System.currentTimeMillis; -import static java.lang.Thread.sleep; -import static java.util.Arrays.asList; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.springframework.transaction.annotation.Propagation.REQUIRED; -import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization; - -@Component -public class JdbcAsyncService implements AsyncService { - private final Logger log = LoggerFactory.getLogger(getClass()); - - private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); - - private final Map queues = new HashMap<>(); - - @Autowired - private TransactionTemplate transactionTemplate; - - @Autowired - private QueueDao queueDao; - - @Autowired - private TaskDao taskDao; - - @Transactional(propagation = REQUIRED) - public Queue registerQueue(final String name, final int interval, AsyncCallable callable) throws SchedulerException { - log.info("registerQueue: ENTER"); - - Queue q = queueDao.findByName(name); - - log.info("q = {}", q); - - final long interval_; - if (q == null) { - q = new Queue(name, interval); - queueDao.insert(q); - interval_ = interval; - } else { - // Found an existing queue. Use the Settings from the database. - interval_ = q.interval; - } - - final QueueThread queueThread = new QueueThread(q, taskDao, transactionTemplate, callable); - queues.put(name, queueThread); - - registerSynchronization(new TransactionSynchronizationAdapter() { - public void afterCompletion(int status) { - log.info("Transaction completed with status = {}", status); - if (status == TransactionSynchronization.STATUS_COMMITTED) { - log.info("Starting thread for queue {} with poll interval = {}s", name, interval); - executor.scheduleAtFixedRate(new Runnable() { - public void run() { - queueThread.ping(); - } - }, 10, interval_, SECONDS); - Thread thread = new Thread(queueThread, name); - thread.setDaemon(true); - thread.start(); - } - } - }); - - log.info("registerQueue: LEAVE"); - return q; - } - - public Queue getQueue(String name) { - QueueThread queueThread = queues.get(name); - - if (queueThread == null) { - throw new RuntimeException("No such queue: '" + name + "'."); - } - - return queueThread.queue; - } - - @Transactional(propagation = REQUIRED) - public Task schedule(final Queue queue, String... args) { - return scheduleInner(null, queue, args); - } - - public Task schedule(long parent, Queue queue, String... args) { - return scheduleInner(parent, queue, args); - } - - private Task scheduleInner(Long parent, final Queue queue, String... args) { - Date scheduled = new Date(); - - StringBuilder arguments = new StringBuilder(); - for (String arg : args) { - arguments.append(arg).append(' '); - } - - long id = taskDao.insert(parent, queue.name, scheduled, arguments.toString()); - Task task = new Task(parent, id, queue.name, scheduled, null, 0, null, asList(args)); - log.info("Created task = {}", task); - - registerSynchronization(new TransactionSynchronizationAdapter() { - public void afterCompletion(int status) { - if (status == TransactionSynchronization.STATUS_COMMITTED) { - queues.get(queue.name).ping(); - } - } - }); - - return task; - } - - @Transactional - public Task await(Task task, long timeout) { - final long start = currentTimeMillis(); - final long end = start + timeout; - - while (currentTimeMillis() < end) { - task = update(task); - - try { - sleep(100); - } catch (InterruptedException e) { - // break - } - } - - return task; - } - - @Transactional(readOnly = true) - public Task update(Task ref) { - return taskDao.findById(ref.id); - } -} diff --git a/src/main/java/io/trygvis/queue/QueueThread.java b/src/main/java/io/trygvis/queue/QueueThread.java deleted file mode 100644 index a981c7e..0000000 --- a/src/main/java/io/trygvis/queue/QueueThread.java +++ /dev/null @@ -1,116 +0,0 @@ -package io.trygvis.queue; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.transaction.TransactionException; -import org.springframework.transaction.TransactionStatus; -import org.springframework.transaction.support.TransactionCallback; -import org.springframework.transaction.support.TransactionCallbackWithoutResult; -import org.springframework.transaction.support.TransactionTemplate; - -import java.util.Date; -import java.util.List; - -class QueueThread implements Runnable { - private final Logger log = LoggerFactory.getLogger(getClass()); - - public boolean shouldRun = true; - - private boolean checkForNewTasks; - - private boolean busy; - - public final Queue queue; - - private final TaskDao taskDao; - - private final TransactionTemplate transactionTemplate; - - private final AsyncService.AsyncCallable callable; - - QueueThread(Queue queue, TaskDao taskDao, TransactionTemplate transactionTemplate, AsyncService.AsyncCallable callable) { - this.queue = queue; - this.taskDao = taskDao; - this.transactionTemplate = transactionTemplate; - this.callable = callable; - } - - public void ping() { - synchronized (this) { - if (!busy) { - log.info("Sending ping to " + queue); - notify(); - } else { - checkForNewTasks = true; - } - } - } - - public void run() { - while (shouldRun) { - try { - List tasks = transactionTemplate.execute(new TransactionCallback>() { - public List doInTransaction(TransactionStatus status) { - return taskDao.findByNameAndCompletedIsNull(queue.name); - } - }); - - log.info("Found {} tasks on queue {}", tasks.size(), queue.name); - - if(tasks.size() > 0) { - for (final Task task : tasks) { - try { - executeTask(task); - } catch (TransactionException | TaskFailureException e) { - log.warn("Task execution failed", e); - } - } - } - } catch (Throwable e) { - log.warn("Error while executing tasks.", e); - } - - synchronized (this) { - busy = false; - - if (checkForNewTasks) { - log.info("Ping received!"); - checkForNewTasks = false; - continue; - } - - try { - wait(); - } catch (InterruptedException e) { - // ignore - } - - busy = true; - } - } - } - - private void executeTask(final Task task) { - final Date run = new Date(); - log.info("Setting last run on task. date = {}, task = {}", run, task); - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - protected void doInTransactionWithoutResult(TransactionStatus status) { - taskDao.update(task.registerRun()); - } - }); - - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - protected void doInTransactionWithoutResult(TransactionStatus status) { - try { - callable.run(task.arguments); - Date completed = new Date(); - Task t = task.registerComplete(completed); - log.info("Completed task: {}", t); - taskDao.update(t); - } catch (Exception e) { - throw new TaskFailureException(e); - } - } - }); - } -} diff --git a/src/main/java/io/trygvis/queue/Task.java b/src/main/java/io/trygvis/queue/Task.java index 09d5060..2b1103b 100755 --- a/src/main/java/io/trygvis/queue/Task.java +++ b/src/main/java/io/trygvis/queue/Task.java @@ -21,7 +21,7 @@ public class Task { public final List arguments; - Task(long id, Long parent, String queue, Date scheduled, Date lastRun, int runCount, Date completed, List arguments) { + public Task(long id, Long parent, String queue, Date scheduled, Date lastRun, int runCount, Date completed, List arguments) { this.id = id; this.parent = parent; this.queue = queue; diff --git a/src/main/java/io/trygvis/queue/TaskFailureException.java b/src/main/java/io/trygvis/queue/TaskFailureException.java deleted file mode 100644 index d3d8c48..0000000 --- a/src/main/java/io/trygvis/queue/TaskFailureException.java +++ /dev/null @@ -1,7 +0,0 @@ -package io.trygvis.queue; - -class TaskFailureException extends RuntimeException { - public TaskFailureException(Exception e) { - super(e); - } -} -- cgit v1.2.3