diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2013-04-20 17:29:18 +0200 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2013-04-20 17:31:02 +0200 |
commit | a03d5154456587fc7920e632f083cc5f1e4318a9 (patch) | |
tree | 08cddc03fc61aae0cfd4deb08bd8f99aca19bd40 /src/main/java/io/trygvis/queue | |
parent | 637dddf11f5d60b35c9696914e1e2658b2ddc611 (diff) | |
download | quartz-based-queue-a03d5154456587fc7920e632f083cc5f1e4318a9.tar.gz quartz-based-queue-a03d5154456587fc7920e632f083cc5f1e4318a9.tar.bz2 quartz-based-queue-a03d5154456587fc7920e632f083cc5f1e4318a9.tar.xz quartz-based-queue-a03d5154456587fc7920e632f083cc5f1e4318a9.zip |
wip
Diffstat (limited to 'src/main/java/io/trygvis/queue')
-rwxr-xr-x | src/main/java/io/trygvis/queue/AsyncService.java | 58 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/JdbcAsyncService.java | 115 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/QueueThread.java | 116 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/TaskDao.java | 11 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/TaskFailureException.java | 7 |
5 files changed, 184 insertions, 123 deletions
diff --git a/src/main/java/io/trygvis/queue/AsyncService.java b/src/main/java/io/trygvis/queue/AsyncService.java index 10f1b79..b42b550 100755 --- a/src/main/java/io/trygvis/queue/AsyncService.java +++ b/src/main/java/io/trygvis/queue/AsyncService.java @@ -1,28 +1,30 @@ -package io.trygvis.queue;
-
-import org.quartz.*;
-
-public interface AsyncService {
-
- /**
- * @param name
- * @param interval how often the queue should be polled for missed tasks in seconds.
- * @param callable
- * @return
- * @throws SchedulerException
- */
- Queue registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException;
-
- Queue getQueue(String name);
-
- Task schedule(Queue queue, String... args);
-
- /**
- * Polls for a new state of the execution.
- */
- Task update(Task ref);
-
- interface AsyncCallable {
- void run() throws Exception;
- }
-}
+package io.trygvis.queue; + +import org.quartz.*; + +import java.util.List; + +public interface AsyncService { + + /** + * @param name + * @param interval how often the queue should be polled for missed tasks in seconds. + * @param callable + * @return + * @throws SchedulerException + */ + Queue registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException; + + Queue getQueue(String name); + + Task schedule(Queue queue, String... args); + + /** + * Polls for a new state of the execution. + */ + Task update(Task ref); + + interface AsyncCallable { + void run(List<String> arguments) throws Exception; + } +} diff --git a/src/main/java/io/trygvis/queue/JdbcAsyncService.java b/src/main/java/io/trygvis/queue/JdbcAsyncService.java index 1df0ab6..a8f581e 100644 --- a/src/main/java/io/trygvis/queue/JdbcAsyncService.java +++ b/src/main/java/io/trygvis/queue/JdbcAsyncService.java @@ -4,7 +4,6 @@ import org.quartz.*; import org.slf4j.*; import org.springframework.beans.factory.annotation.*; import org.springframework.stereotype.*; -import org.springframework.transaction.*; import org.springframework.transaction.annotation.*; import org.springframework.transaction.support.*; @@ -43,7 +42,7 @@ public class JdbcAsyncService implements AsyncService { final long interval_; if (q == null) { - q = new Queue(name, interval * 1000); + q = new Queue(name, interval); queueDao.insert(q); interval_ = interval; } else { @@ -51,18 +50,19 @@ public class JdbcAsyncService implements AsyncService { interval_ = q.interval; } - final QueueThread queueThread = new QueueThread(q, callable); + final QueueThread queueThread = new QueueThread(q, taskDao, transactionTemplate, callable); queues.put(name, queueThread); registerSynchronization(new TransactionSynchronizationAdapter() { public void afterCompletion(int status) { - log.info("status = {}", status); + log.info("Transaction completed with status = {}", status); if (status == TransactionSynchronization.STATUS_COMMITTED) { + log.info("Starting thread for queue {} with poll interval = {}s", name, interval); executor.scheduleAtFixedRate(new Runnable() { public void run() { queueThread.ping(); } - }, 1000, 1000 * interval_, MILLISECONDS); + }, 10, interval_, SECONDS); Thread thread = new Thread(queueThread, name); thread.setDaemon(true); thread.start(); @@ -85,9 +85,7 @@ public class JdbcAsyncService implements AsyncService { } @Transactional(propagation = REQUIRED) - public Task schedule(Queue queue, String... args) { - log.info("schedule: ENTER"); - + public Task schedule(final Queue queue, String... args) { Date scheduled = new Date(); StringBuilder arguments = new StringBuilder(); @@ -97,15 +95,22 @@ public class JdbcAsyncService implements AsyncService { long id = taskDao.insert(queue.name, scheduled, arguments.toString()); Task task = new Task(id, queue.name, scheduled, null, 0, null, asList(args)); - log.info("task = {}", task); - queues.get(queue.name).ping(); - try { - Thread.sleep(500); - } catch (InterruptedException e) { - e.printStackTrace(); - } + log.info("Created task = {}", task); +// queues.get(queue.name).ping(); +// try { +// Thread.sleep(500); +// } catch (InterruptedException e) { +// e.printStackTrace(); +// } + + registerSynchronization(new TransactionSynchronizationAdapter() { + public void afterCompletion(int status) { + if (status == TransactionSynchronization.STATUS_COMMITTED) { + queues.get(queue.name).ping(); + } + } + }); - log.info("schedule: LEAVE"); return task; } @@ -113,82 +118,4 @@ public class JdbcAsyncService implements AsyncService { public Task update(Task ref) { return taskDao.findById(ref.id); } - - class QueueThread implements Runnable { - public boolean shouldRun = true; - - public final Queue queue; - - private final AsyncCallable callable; - - QueueThread(Queue queue, AsyncCallable callable) { - this.queue = queue; - this.callable = callable; - } - - public void ping() { - log.info("Sending ping to " + queue); - synchronized (this) { - notify(); - } - } - - public void run() { - while (shouldRun) { - List<Task> tasks = taskDao.findByNameAndCompletedIsNull(queue.name); - - log.info("Found {} tasks on queue {}", tasks.size(), queue.name); - - try { - for (final Task task : tasks) { - try { - executeTask(task); - } catch (TransactionException | TaskFailureException e) { - log.warn("Task execution failed", e); - } - } - } catch (Exception e) { - log.warn("Error while executing tasks.", e); - } - - synchronized (this) { - try { - wait(); - } catch (InterruptedException e) { - // ignore - } - } - } - } - - private void executeTask(final Task task) { - final Date run = new Date(); - log.info("Setting last run on task. date = {}, task = {}", run, task); - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - protected void doInTransactionWithoutResult(TransactionStatus status) { - taskDao.update(task.registerRun()); - } - }); - - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - protected void doInTransactionWithoutResult(TransactionStatus status) { - try { - callable.run(); - Date completed = new Date(); - Task t = task.registerComplete(completed); - log.info("Completed task: {}", t); - taskDao.update(t); - } catch (Exception e) { - throw new TaskFailureException(e); - } - } - }); - } - } - - private static class TaskFailureException extends RuntimeException { - public TaskFailureException(Exception e) { - super(e); - } - } } diff --git a/src/main/java/io/trygvis/queue/QueueThread.java b/src/main/java/io/trygvis/queue/QueueThread.java new file mode 100644 index 0000000..a981c7e --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueThread.java @@ -0,0 +1,116 @@ +package io.trygvis.queue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.transaction.TransactionException; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallback; +import org.springframework.transaction.support.TransactionCallbackWithoutResult; +import org.springframework.transaction.support.TransactionTemplate; + +import java.util.Date; +import java.util.List; + +class QueueThread implements Runnable { + private final Logger log = LoggerFactory.getLogger(getClass()); + + public boolean shouldRun = true; + + private boolean checkForNewTasks; + + private boolean busy; + + public final Queue queue; + + private final TaskDao taskDao; + + private final TransactionTemplate transactionTemplate; + + private final AsyncService.AsyncCallable callable; + + QueueThread(Queue queue, TaskDao taskDao, TransactionTemplate transactionTemplate, AsyncService.AsyncCallable callable) { + this.queue = queue; + this.taskDao = taskDao; + this.transactionTemplate = transactionTemplate; + this.callable = callable; + } + + public void ping() { + synchronized (this) { + if (!busy) { + log.info("Sending ping to " + queue); + notify(); + } else { + checkForNewTasks = true; + } + } + } + + public void run() { + while (shouldRun) { + try { + List<Task> tasks = transactionTemplate.execute(new TransactionCallback<List<Task>>() { + public List<Task> doInTransaction(TransactionStatus status) { + return taskDao.findByNameAndCompletedIsNull(queue.name); + } + }); + + log.info("Found {} tasks on queue {}", tasks.size(), queue.name); + + if(tasks.size() > 0) { + for (final Task task : tasks) { + try { + executeTask(task); + } catch (TransactionException | TaskFailureException e) { + log.warn("Task execution failed", e); + } + } + } + } catch (Throwable e) { + log.warn("Error while executing tasks.", e); + } + + synchronized (this) { + busy = false; + + if (checkForNewTasks) { + log.info("Ping received!"); + checkForNewTasks = false; + continue; + } + + try { + wait(); + } catch (InterruptedException e) { + // ignore + } + + busy = true; + } + } + } + + private void executeTask(final Task task) { + final Date run = new Date(); + log.info("Setting last run on task. date = {}, task = {}", run, task); + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + protected void doInTransactionWithoutResult(TransactionStatus status) { + taskDao.update(task.registerRun()); + } + }); + + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + protected void doInTransactionWithoutResult(TransactionStatus status) { + try { + callable.run(task.arguments); + Date completed = new Date(); + Task t = task.registerComplete(completed); + log.info("Completed task: {}", t); + taskDao.update(t); + } catch (Exception e) { + throw new TaskFailureException(e); + } + } + }); + } +} diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java index 2e407a5..2bf2145 100644 --- a/src/main/java/io/trygvis/queue/TaskDao.java +++ b/src/main/java/io/trygvis/queue/TaskDao.java @@ -3,12 +3,16 @@ package io.trygvis.queue; import org.springframework.beans.factory.annotation.*; import org.springframework.jdbc.core.*; import org.springframework.stereotype.*; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; import java.sql.*; import java.util.Date; import java.util.*; import static java.util.Arrays.*; +import static org.springframework.transaction.annotation.Propagation.MANDATORY; +import static org.springframework.transaction.annotation.Propagation.REQUIRED; @Component public class TaskDao { @@ -16,22 +20,26 @@ public class TaskDao { @Autowired private JdbcTemplate jdbcTemplate; + @Transactional(propagation = MANDATORY) public long insert(String queue, Date scheduled, String arguments) { jdbcTemplate.update("INSERT INTO task(id, run_count, queue, scheduled, arguments) " + "VALUES(nextval('task_seq'), 0, ?, ?, ?)", queue, scheduled, arguments); return jdbcTemplate.queryForObject("SELECT currval('task_seq')", Long.class); } + @Transactional(propagation = MANDATORY) public Task findById(long id) { return jdbcTemplate.queryForObject("SELECT " + TaskRowMapper.fields + " FROM task WHERE id=?", new TaskRowMapper(), id); } + @Transactional(propagation = MANDATORY) public List<Task> findByNameAndCompletedIsNull(String name) { return jdbcTemplate.query("SELECT " + TaskRowMapper.fields + " FROM task WHERE queue=? AND completed IS NULL", new TaskRowMapper(), name); } + @Transactional(propagation = MANDATORY) public void update(Task task) { jdbcTemplate.update("UPDATE task SET scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?", task.scheduled, task.lastRun, task.runCount, task.completed, task.id); @@ -41,6 +49,7 @@ public class TaskDao { public static final String fields = "id, queue, scheduled, last_run, run_count, completed, arguments"; public Task mapRow(ResultSet rs, int rowNum) throws SQLException { + String arguments = rs.getString(7); return new Task( rs.getLong(1), rs.getString(2), @@ -48,7 +57,7 @@ public class TaskDao { rs.getTimestamp(4), rs.getInt(5), rs.getTimestamp(6), - asList(rs.getString(7).split(" "))); + arguments != null ? asList(arguments.split(" ")) : Collections.<String>emptyList()); } } } diff --git a/src/main/java/io/trygvis/queue/TaskFailureException.java b/src/main/java/io/trygvis/queue/TaskFailureException.java new file mode 100644 index 0000000..d3d8c48 --- /dev/null +++ b/src/main/java/io/trygvis/queue/TaskFailureException.java @@ -0,0 +1,7 @@ +package io.trygvis.queue; + +class TaskFailureException extends RuntimeException { + public TaskFailureException(Exception e) { + super(e); + } +} |