From 7d704feb86c44fca57941d223e8605b55fcf68f0 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Wed, 29 May 2013 22:16:50 +0200 Subject: o Splitting out the parts that implement the "async" features vs the "queue" features. --- JpaAsyncService.java | 242 --------------------- .../java/io/trygvis/CreateArticleCallable.java | 2 +- src/main/java/io/trygvis/Main.java | 2 +- .../java/io/trygvis/UpdateArticleCallable.java | 2 +- src/main/java/io/trygvis/async/AsyncService.java | 37 ++++ .../java/io/trygvis/async/JdbcAsyncService.java | 153 +++++++++++++ src/main/java/io/trygvis/async/QueueThread.java | 119 ++++++++++ .../io/trygvis/async/TaskFailureException.java | 7 + src/main/java/io/trygvis/queue/AsyncService.java | 32 --- .../java/io/trygvis/queue/JdbcAsyncService.java | 149 ------------- src/main/java/io/trygvis/queue/QueueThread.java | 116 ---------- src/main/java/io/trygvis/queue/Task.java | 2 +- .../io/trygvis/queue/TaskFailureException.java | 7 - 13 files changed, 320 insertions(+), 550 deletions(-) delete mode 100755 JpaAsyncService.java create mode 100755 src/main/java/io/trygvis/async/AsyncService.java create mode 100644 src/main/java/io/trygvis/async/JdbcAsyncService.java create mode 100644 src/main/java/io/trygvis/async/QueueThread.java create mode 100644 src/main/java/io/trygvis/async/TaskFailureException.java delete mode 100755 src/main/java/io/trygvis/queue/AsyncService.java delete mode 100644 src/main/java/io/trygvis/queue/JdbcAsyncService.java delete mode 100644 src/main/java/io/trygvis/queue/QueueThread.java delete mode 100644 src/main/java/io/trygvis/queue/TaskFailureException.java diff --git a/JpaAsyncService.java b/JpaAsyncService.java deleted file mode 100755 index eaef782..0000000 --- a/JpaAsyncService.java +++ /dev/null @@ -1,242 +0,0 @@ -package io.trygvis.queue; - -import static java.util.concurrent.TimeUnit.*; -import static org.springframework.transaction.support.TransactionSynchronizationManager.*; - -@SuppressWarnings("SpringJavaAutowiringInspection") -@Component -public class JpaAsyncService implements AsyncService/*, ApplicationContextAware*/ { - - private final Logger log = LoggerFactory.getLogger(getClass()); - - private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); - - @PersistenceContext - private EntityManager entityManager; - - @Autowired - private TransactionTemplate transactionTemplate; - - @Autowired - private QueueRepository queueRepository; - - @Autowired - private TaskRepository taskRepository; - - @Transactional - public synchronized JpaQueueRef registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException { - log.info("registerQueue: ENTER"); - - Queue q = queueRepository.findByName(name); - - log.info("q = {}", q); - - if (q == null) { - q = new Queue(name, interval); - q = queueRepository.save(q); - } else { - boolean dirty = false; - if (interval != q.getInterval()) { - q.setInterval(interval); - dirty = true; - } - - if (dirty) { - q = queueRepository.save(q); - } - } - - log.info("q = {}", q); - entityManager.flush(); -// entityManager.detach(q); - log.info("q = {}", q); - - JpaQueueRef queueRef = new JpaQueueRef(q); - - log.info("registerQueue: LEAVE"); - - registerSynchronization(new MyTransactionSynchronization(callable, interval, queueRef)); - - return queueRef; - } - - @Transactional(readOnly = true) - public JpaQueueRef getQueue(String name) { - Queue queue = queueRepository.findByName(name); - - if (queue == null) { - throw new RollbackException("No such queue: '" + name + "'."); - } - - entityManager.detach(queue); - - return new JpaQueueRef(queue); - } - - @Transactional - public JpaExecutionRef schedule(JpaQueueRef queue, String... args) { - log.info("schedule: ENTER"); - Date scheduled = new Date(); - Task task = new Task(queue.queue, scheduled, args); - log.info("task = {}", task); - taskRepository.save(task); - log.info("task = {}", task); -// entityManager.detach(task); - log.info("schedule: LEAVE"); - return new JpaExecutionRef(task); - } - - @Transactional(readOnly = true) - public JpaExecutionRef update(JpaExecutionRef ref) { - return new JpaExecutionRef(taskRepository.findOne(ref.task.getId())); - } - - public static class JpaQueueRef implements AsyncService.QueueRef { - public final Queue queue; - - JpaQueueRef(Queue queue) { - this.queue = queue; - } - - public String toString() { - return "JpaQueueRef{" + - "queue=" + queue + - '}'; - } - } - - public static class JpaExecutionRef implements AsyncService.ExecutionRef { - private final Task task; - - public JpaExecutionRef(Task task) { - this.task = task; - } - - public List getArguments() { - return Arrays.asList(task.getArguments()); - } - - public Date getScheduled() { - return task.getScheduled(); - } - - public Date getLastRun() { - return task.getLastRun(); - } - - public Date getCompleted() { - return task.getCompleted(); - } - - public boolean isDone() { - return task.isDone(); - } - - public String toString() { - return "JpaExecutionRef{" + - "task=" + task + - '}'; - } - } - - private static class TaskFailureException extends RuntimeException { - public TaskFailureException(Exception e) { - super(e); - } - } - - private class CheckTimerTask implements Runnable { - private final AsyncCallable callable; - private final JpaQueueRef queueRef; - - private CheckTimerTask(AsyncCallable callable, JpaQueueRef queueRef) { - this.callable = callable; - this.queueRef = queueRef; - } - - public void run() { - log.info("JpaAsyncService$CheckTimerTask.run"); - - List tasks = taskRepository.findByQueueAndCompletedIsNull(queueRef.queue); - - System.out.println("tasks.size() = " + tasks.size()); - - try { - for (final Task task : tasks) { - try { - executeTask(task); - } catch (TransactionException | TaskFailureException e) { - log.warn("Task execution failed", e); - } - } - } catch (Exception e) { - log.warn("Error while executing tasks.", e); - } - } - - private void executeTask(final Task task) { - final Date run = new Date(); - log.info("Setting last run on task. date = {}, task = {}", run, task); - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - protected void doInTransactionWithoutResult(TransactionStatus status) { - task.registerRun(); - taskRepository.save(task); - } - }); - - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - protected void doInTransactionWithoutResult(TransactionStatus status) { - try { - callable.run(); - Date completed = new Date(); - log.info("Setting completed on task. date = {}, task = {}", completed, task); - task.registerComplete(completed); - taskRepository.save(task); - } catch (Exception e) { - throw new TaskFailureException(e); - } - } - }); - } - } - - private class MyTransactionSynchronization implements TransactionSynchronization { - - private final AsyncCallable callable; - - private final int interval; - - private final JpaQueueRef queueRef; - - public MyTransactionSynchronization(AsyncCallable callable, int interval, JpaQueueRef queueRef) { - this.callable = callable; - this.interval = interval; - this.queueRef = queueRef; - } - - public void suspend() { - } - - public void resume() { - } - - public void flush() { - } - - public void beforeCommit(boolean readOnly) { - } - - public void beforeCompletion() { - } - - public void afterCommit() { - } - - public void afterCompletion(int status) { - log.info("status = {}", status); - if (status == TransactionSynchronization.STATUS_COMMITTED) { - executor.scheduleAtFixedRate(new CheckTimerTask(callable, queueRef), 1000, 1000 * interval, MILLISECONDS); - } - } - } -} diff --git a/src/main/java/io/trygvis/CreateArticleCallable.java b/src/main/java/io/trygvis/CreateArticleCallable.java index 420a5b5..471b59d 100755 --- a/src/main/java/io/trygvis/CreateArticleCallable.java +++ b/src/main/java/io/trygvis/CreateArticleCallable.java @@ -1,7 +1,7 @@ package io.trygvis; import io.trygvis.model.Article; -import io.trygvis.queue.AsyncService; +import io.trygvis.async.AsyncService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; diff --git a/src/main/java/io/trygvis/Main.java b/src/main/java/io/trygvis/Main.java index d22cab2..08b9b75 100755 --- a/src/main/java/io/trygvis/Main.java +++ b/src/main/java/io/trygvis/Main.java @@ -1,6 +1,6 @@ package io.trygvis; -import io.trygvis.queue.AsyncService; +import io.trygvis.async.AsyncService; import io.trygvis.queue.Queue; import io.trygvis.queue.Task; import org.hibernate.dialect.PostgreSQL82Dialect; diff --git a/src/main/java/io/trygvis/UpdateArticleCallable.java b/src/main/java/io/trygvis/UpdateArticleCallable.java index f1ea0e2..a910855 100755 --- a/src/main/java/io/trygvis/UpdateArticleCallable.java +++ b/src/main/java/io/trygvis/UpdateArticleCallable.java @@ -1,6 +1,6 @@ package io.trygvis; -import io.trygvis.queue.AsyncService; +import io.trygvis.async.AsyncService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; diff --git a/src/main/java/io/trygvis/async/AsyncService.java b/src/main/java/io/trygvis/async/AsyncService.java new file mode 100755 index 0000000..e90a0e4 --- /dev/null +++ b/src/main/java/io/trygvis/async/AsyncService.java @@ -0,0 +1,37 @@ +package io.trygvis.async; + +import io.trygvis.queue.Queue; +import io.trygvis.queue.Task; +import org.quartz.SchedulerException; + +import java.util.List; + +/** + * A simple framework for running tasks. + */ +public interface AsyncService { + + /** + * @param name + * @param interval how often the queue should be polled for missed tasks in seconds. + * @param callable + * @return + * @throws SchedulerException + */ + Queue registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException; + + Queue getQueue(String name); + + Task schedule(Queue queue, String... args); + + Task schedule(long parent, Queue queue, String... args); + + /** + * Polls for a new state of the execution. + */ + Task update(Task ref); + + interface AsyncCallable { + void run(List arguments) throws Exception; + } +} diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java new file mode 100644 index 0000000..4e78a37 --- /dev/null +++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java @@ -0,0 +1,153 @@ +package io.trygvis.async; + +import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueDao; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskDao; +import org.quartz.SchedulerException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionSynchronization; +import org.springframework.transaction.support.TransactionSynchronizationAdapter; +import org.springframework.transaction.support.TransactionTemplate; + +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static java.lang.System.currentTimeMillis; +import static java.lang.Thread.sleep; +import static java.util.Arrays.asList; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.springframework.transaction.annotation.Propagation.REQUIRED; +import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization; + +@Component +public class JdbcAsyncService implements AsyncService { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); + + private final Map queues = new HashMap<>(); + + @Autowired + private TransactionTemplate transactionTemplate; + + @Autowired + private QueueDao queueDao; + + @Autowired + private TaskDao taskDao; + + @Transactional(propagation = REQUIRED) + public Queue registerQueue(final String name, final int interval, AsyncCallable callable) throws SchedulerException { + log.info("registerQueue: ENTER"); + + Queue q = queueDao.findByName(name); + + log.info("q = {}", q); + + final long interval_; + if (q == null) { + q = new Queue(name, interval); + queueDao.insert(q); + interval_ = interval; + } else { + // Found an existing queue. Use the Settings from the database. + interval_ = q.interval; + } + + final QueueThread queueThread = new QueueThread(q, taskDao, transactionTemplate, callable); + queues.put(name, queueThread); + + registerSynchronization(new TransactionSynchronizationAdapter() { + public void afterCompletion(int status) { + log.info("Transaction completed with status = {}", status); + if (status == TransactionSynchronization.STATUS_COMMITTED) { + log.info("Starting thread for queue {} with poll interval = {}s", name, interval); + executor.scheduleAtFixedRate(new Runnable() { + public void run() { + queueThread.ping(); + } + }, 10, interval_, SECONDS); + Thread thread = new Thread(queueThread, name); + thread.setDaemon(true); + thread.start(); + } + } + }); + + log.info("registerQueue: LEAVE"); + return q; + } + + public Queue getQueue(String name) { + QueueThread queueThread = queues.get(name); + + if (queueThread == null) { + throw new RuntimeException("No such queue: '" + name + "'."); + } + + return queueThread.queue; + } + + @Transactional(propagation = REQUIRED) + public Task schedule(final Queue queue, String... args) { + return scheduleInner(null, queue, args); + } + + public Task schedule(long parent, Queue queue, String... args) { + return scheduleInner(parent, queue, args); + } + + private Task scheduleInner(Long parent, final Queue queue, String... args) { + Date scheduled = new Date(); + + StringBuilder arguments = new StringBuilder(); + for (String arg : args) { + arguments.append(arg).append(' '); + } + + long id = taskDao.insert(parent, queue.name, scheduled, arguments.toString()); + Task task = new Task(parent, id, queue.name, scheduled, null, 0, null, asList(args)); + log.info("Created task = {}", task); + + registerSynchronization(new TransactionSynchronizationAdapter() { + public void afterCompletion(int status) { + if (status == TransactionSynchronization.STATUS_COMMITTED) { + queues.get(queue.name).ping(); + } + } + }); + + return task; + } + + @Transactional + public Task await(Task task, long timeout) { + final long start = currentTimeMillis(); + final long end = start + timeout; + + while (currentTimeMillis() < end) { + task = update(task); + + try { + sleep(100); + } catch (InterruptedException e) { + // break + } + } + + return task; + } + + @Transactional(readOnly = true) + public Task update(Task ref) { + return taskDao.findById(ref.id); + } +} diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java new file mode 100644 index 0000000..69466df --- /dev/null +++ b/src/main/java/io/trygvis/async/QueueThread.java @@ -0,0 +1,119 @@ +package io.trygvis.async; + +import io.trygvis.queue.Queue; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskDao; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.transaction.TransactionException; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallback; +import org.springframework.transaction.support.TransactionCallbackWithoutResult; +import org.springframework.transaction.support.TransactionTemplate; + +import java.util.Date; +import java.util.List; + +class QueueThread implements Runnable { + private final Logger log = LoggerFactory.getLogger(getClass()); + + public boolean shouldRun = true; + + private boolean checkForNewTasks; + + private boolean busy; + + public final Queue queue; + + private final TaskDao taskDao; + + private final TransactionTemplate transactionTemplate; + + private final AsyncService.AsyncCallable callable; + + QueueThread(Queue queue, TaskDao taskDao, TransactionTemplate transactionTemplate, AsyncService.AsyncCallable callable) { + this.queue = queue; + this.taskDao = taskDao; + this.transactionTemplate = transactionTemplate; + this.callable = callable; + } + + public void ping() { + synchronized (this) { + if (!busy) { + log.info("Sending ping to " + queue); + notify(); + } else { + checkForNewTasks = true; + } + } + } + + public void run() { + while (shouldRun) { + try { + List tasks = transactionTemplate.execute(new TransactionCallback>() { + public List doInTransaction(TransactionStatus status) { + return taskDao.findByNameAndCompletedIsNull(queue.name); + } + }); + + log.info("Found {} tasks on queue {}", tasks.size(), queue.name); + + if(tasks.size() > 0) { + for (final Task task : tasks) { + try { + executeTask(task); + } catch (TransactionException | TaskFailureException e) { + log.warn("Task execution failed", e); + } + } + } + } catch (Throwable e) { + log.warn("Error while executing tasks.", e); + } + + synchronized (this) { + busy = false; + + if (checkForNewTasks) { + log.info("Ping received!"); + checkForNewTasks = false; + continue; + } + + try { + wait(); + } catch (InterruptedException e) { + // ignore + } + + busy = true; + } + } + } + + private void executeTask(final Task task) { + final Date run = new Date(); + log.info("Setting last run on task. date = {}, task = {}", run, task); + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + protected void doInTransactionWithoutResult(TransactionStatus status) { + taskDao.update(task.registerRun()); + } + }); + + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + protected void doInTransactionWithoutResult(TransactionStatus status) { + try { + callable.run(task.arguments); + Date completed = new Date(); + Task t = task.registerComplete(completed); + log.info("Completed task: {}", t); + taskDao.update(t); + } catch (Exception e) { + throw new TaskFailureException(e); + } + } + }); + } +} diff --git a/src/main/java/io/trygvis/async/TaskFailureException.java b/src/main/java/io/trygvis/async/TaskFailureException.java new file mode 100644 index 0000000..7278e17 --- /dev/null +++ b/src/main/java/io/trygvis/async/TaskFailureException.java @@ -0,0 +1,7 @@ +package io.trygvis.async; + +class TaskFailureException extends RuntimeException { + public TaskFailureException(Exception e) { + super(e); + } +} diff --git a/src/main/java/io/trygvis/queue/AsyncService.java b/src/main/java/io/trygvis/queue/AsyncService.java deleted file mode 100755 index c9e5861..0000000 --- a/src/main/java/io/trygvis/queue/AsyncService.java +++ /dev/null @@ -1,32 +0,0 @@ -package io.trygvis.queue; - -import org.quartz.SchedulerException; - -import java.util.List; - -public interface AsyncService { - - /** - * @param name - * @param interval how often the queue should be polled for missed tasks in seconds. - * @param callable - * @return - * @throws SchedulerException - */ - Queue registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException; - - Queue getQueue(String name); - - Task schedule(Queue queue, String... args); - - Task schedule(long parent, Queue queue, String... args); - - /** - * Polls for a new state of the execution. - */ - Task update(Task ref); - - interface AsyncCallable { - void run(List arguments) throws Exception; - } -} diff --git a/src/main/java/io/trygvis/queue/JdbcAsyncService.java b/src/main/java/io/trygvis/queue/JdbcAsyncService.java deleted file mode 100644 index 276541f..0000000 --- a/src/main/java/io/trygvis/queue/JdbcAsyncService.java +++ /dev/null @@ -1,149 +0,0 @@ -package io.trygvis.queue; - -import org.quartz.SchedulerException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Transactional; -import org.springframework.transaction.support.TransactionSynchronization; -import org.springframework.transaction.support.TransactionSynchronizationAdapter; -import org.springframework.transaction.support.TransactionTemplate; - -import java.util.Date; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledThreadPoolExecutor; - -import static java.lang.System.currentTimeMillis; -import static java.lang.Thread.sleep; -import static java.util.Arrays.asList; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.springframework.transaction.annotation.Propagation.REQUIRED; -import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization; - -@Component -public class JdbcAsyncService implements AsyncService { - private final Logger log = LoggerFactory.getLogger(getClass()); - - private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); - - private final Map queues = new HashMap<>(); - - @Autowired - private TransactionTemplate transactionTemplate; - - @Autowired - private QueueDao queueDao; - - @Autowired - private TaskDao taskDao; - - @Transactional(propagation = REQUIRED) - public Queue registerQueue(final String name, final int interval, AsyncCallable callable) throws SchedulerException { - log.info("registerQueue: ENTER"); - - Queue q = queueDao.findByName(name); - - log.info("q = {}", q); - - final long interval_; - if (q == null) { - q = new Queue(name, interval); - queueDao.insert(q); - interval_ = interval; - } else { - // Found an existing queue. Use the Settings from the database. - interval_ = q.interval; - } - - final QueueThread queueThread = new QueueThread(q, taskDao, transactionTemplate, callable); - queues.put(name, queueThread); - - registerSynchronization(new TransactionSynchronizationAdapter() { - public void afterCompletion(int status) { - log.info("Transaction completed with status = {}", status); - if (status == TransactionSynchronization.STATUS_COMMITTED) { - log.info("Starting thread for queue {} with poll interval = {}s", name, interval); - executor.scheduleAtFixedRate(new Runnable() { - public void run() { - queueThread.ping(); - } - }, 10, interval_, SECONDS); - Thread thread = new Thread(queueThread, name); - thread.setDaemon(true); - thread.start(); - } - } - }); - - log.info("registerQueue: LEAVE"); - return q; - } - - public Queue getQueue(String name) { - QueueThread queueThread = queues.get(name); - - if (queueThread == null) { - throw new RuntimeException("No such queue: '" + name + "'."); - } - - return queueThread.queue; - } - - @Transactional(propagation = REQUIRED) - public Task schedule(final Queue queue, String... args) { - return scheduleInner(null, queue, args); - } - - public Task schedule(long parent, Queue queue, String... args) { - return scheduleInner(parent, queue, args); - } - - private Task scheduleInner(Long parent, final Queue queue, String... args) { - Date scheduled = new Date(); - - StringBuilder arguments = new StringBuilder(); - for (String arg : args) { - arguments.append(arg).append(' '); - } - - long id = taskDao.insert(parent, queue.name, scheduled, arguments.toString()); - Task task = new Task(parent, id, queue.name, scheduled, null, 0, null, asList(args)); - log.info("Created task = {}", task); - - registerSynchronization(new TransactionSynchronizationAdapter() { - public void afterCompletion(int status) { - if (status == TransactionSynchronization.STATUS_COMMITTED) { - queues.get(queue.name).ping(); - } - } - }); - - return task; - } - - @Transactional - public Task await(Task task, long timeout) { - final long start = currentTimeMillis(); - final long end = start + timeout; - - while (currentTimeMillis() < end) { - task = update(task); - - try { - sleep(100); - } catch (InterruptedException e) { - // break - } - } - - return task; - } - - @Transactional(readOnly = true) - public Task update(Task ref) { - return taskDao.findById(ref.id); - } -} diff --git a/src/main/java/io/trygvis/queue/QueueThread.java b/src/main/java/io/trygvis/queue/QueueThread.java deleted file mode 100644 index a981c7e..0000000 --- a/src/main/java/io/trygvis/queue/QueueThread.java +++ /dev/null @@ -1,116 +0,0 @@ -package io.trygvis.queue; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.transaction.TransactionException; -import org.springframework.transaction.TransactionStatus; -import org.springframework.transaction.support.TransactionCallback; -import org.springframework.transaction.support.TransactionCallbackWithoutResult; -import org.springframework.transaction.support.TransactionTemplate; - -import java.util.Date; -import java.util.List; - -class QueueThread implements Runnable { - private final Logger log = LoggerFactory.getLogger(getClass()); - - public boolean shouldRun = true; - - private boolean checkForNewTasks; - - private boolean busy; - - public final Queue queue; - - private final TaskDao taskDao; - - private final TransactionTemplate transactionTemplate; - - private final AsyncService.AsyncCallable callable; - - QueueThread(Queue queue, TaskDao taskDao, TransactionTemplate transactionTemplate, AsyncService.AsyncCallable callable) { - this.queue = queue; - this.taskDao = taskDao; - this.transactionTemplate = transactionTemplate; - this.callable = callable; - } - - public void ping() { - synchronized (this) { - if (!busy) { - log.info("Sending ping to " + queue); - notify(); - } else { - checkForNewTasks = true; - } - } - } - - public void run() { - while (shouldRun) { - try { - List tasks = transactionTemplate.execute(new TransactionCallback>() { - public List doInTransaction(TransactionStatus status) { - return taskDao.findByNameAndCompletedIsNull(queue.name); - } - }); - - log.info("Found {} tasks on queue {}", tasks.size(), queue.name); - - if(tasks.size() > 0) { - for (final Task task : tasks) { - try { - executeTask(task); - } catch (TransactionException | TaskFailureException e) { - log.warn("Task execution failed", e); - } - } - } - } catch (Throwable e) { - log.warn("Error while executing tasks.", e); - } - - synchronized (this) { - busy = false; - - if (checkForNewTasks) { - log.info("Ping received!"); - checkForNewTasks = false; - continue; - } - - try { - wait(); - } catch (InterruptedException e) { - // ignore - } - - busy = true; - } - } - } - - private void executeTask(final Task task) { - final Date run = new Date(); - log.info("Setting last run on task. date = {}, task = {}", run, task); - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - protected void doInTransactionWithoutResult(TransactionStatus status) { - taskDao.update(task.registerRun()); - } - }); - - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - protected void doInTransactionWithoutResult(TransactionStatus status) { - try { - callable.run(task.arguments); - Date completed = new Date(); - Task t = task.registerComplete(completed); - log.info("Completed task: {}", t); - taskDao.update(t); - } catch (Exception e) { - throw new TaskFailureException(e); - } - } - }); - } -} diff --git a/src/main/java/io/trygvis/queue/Task.java b/src/main/java/io/trygvis/queue/Task.java index 09d5060..2b1103b 100755 --- a/src/main/java/io/trygvis/queue/Task.java +++ b/src/main/java/io/trygvis/queue/Task.java @@ -21,7 +21,7 @@ public class Task { public final List arguments; - Task(long id, Long parent, String queue, Date scheduled, Date lastRun, int runCount, Date completed, List arguments) { + public Task(long id, Long parent, String queue, Date scheduled, Date lastRun, int runCount, Date completed, List arguments) { this.id = id; this.parent = parent; this.queue = queue; diff --git a/src/main/java/io/trygvis/queue/TaskFailureException.java b/src/main/java/io/trygvis/queue/TaskFailureException.java deleted file mode 100644 index d3d8c48..0000000 --- a/src/main/java/io/trygvis/queue/TaskFailureException.java +++ /dev/null @@ -1,7 +0,0 @@ -package io.trygvis.queue; - -class TaskFailureException extends RuntimeException { - public TaskFailureException(Exception e) { - super(e); - } -} -- cgit v1.2.3