From a03d5154456587fc7920e632f083cc5f1e4318a9 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sat, 20 Apr 2013 17:29:18 +0200 Subject: wip --- .../java/io/trygvis/queue/JdbcAsyncService.java | 115 ++++----------------- 1 file changed, 21 insertions(+), 94 deletions(-) (limited to 'src/main/java/io/trygvis/queue/JdbcAsyncService.java') diff --git a/src/main/java/io/trygvis/queue/JdbcAsyncService.java b/src/main/java/io/trygvis/queue/JdbcAsyncService.java index 1df0ab6..a8f581e 100644 --- a/src/main/java/io/trygvis/queue/JdbcAsyncService.java +++ b/src/main/java/io/trygvis/queue/JdbcAsyncService.java @@ -4,7 +4,6 @@ import org.quartz.*; import org.slf4j.*; import org.springframework.beans.factory.annotation.*; import org.springframework.stereotype.*; -import org.springframework.transaction.*; import org.springframework.transaction.annotation.*; import org.springframework.transaction.support.*; @@ -43,7 +42,7 @@ public class JdbcAsyncService implements AsyncService { final long interval_; if (q == null) { - q = new Queue(name, interval * 1000); + q = new Queue(name, interval); queueDao.insert(q); interval_ = interval; } else { @@ -51,18 +50,19 @@ public class JdbcAsyncService implements AsyncService { interval_ = q.interval; } - final QueueThread queueThread = new QueueThread(q, callable); + final QueueThread queueThread = new QueueThread(q, taskDao, transactionTemplate, callable); queues.put(name, queueThread); registerSynchronization(new TransactionSynchronizationAdapter() { public void afterCompletion(int status) { - log.info("status = {}", status); + log.info("Transaction completed with status = {}", status); if (status == TransactionSynchronization.STATUS_COMMITTED) { + log.info("Starting thread for queue {} with poll interval = {}s", name, interval); executor.scheduleAtFixedRate(new Runnable() { public void run() { queueThread.ping(); } - }, 1000, 1000 * interval_, MILLISECONDS); + }, 10, interval_, SECONDS); Thread thread = new Thread(queueThread, name); thread.setDaemon(true); thread.start(); @@ -85,9 +85,7 @@ public class JdbcAsyncService implements AsyncService { } @Transactional(propagation = REQUIRED) - public Task schedule(Queue queue, String... args) { - log.info("schedule: ENTER"); - + public Task schedule(final Queue queue, String... args) { Date scheduled = new Date(); StringBuilder arguments = new StringBuilder(); @@ -97,15 +95,22 @@ public class JdbcAsyncService implements AsyncService { long id = taskDao.insert(queue.name, scheduled, arguments.toString()); Task task = new Task(id, queue.name, scheduled, null, 0, null, asList(args)); - log.info("task = {}", task); - queues.get(queue.name).ping(); - try { - Thread.sleep(500); - } catch (InterruptedException e) { - e.printStackTrace(); - } + log.info("Created task = {}", task); +// queues.get(queue.name).ping(); +// try { +// Thread.sleep(500); +// } catch (InterruptedException e) { +// e.printStackTrace(); +// } + + registerSynchronization(new TransactionSynchronizationAdapter() { + public void afterCompletion(int status) { + if (status == TransactionSynchronization.STATUS_COMMITTED) { + queues.get(queue.name).ping(); + } + } + }); - log.info("schedule: LEAVE"); return task; } @@ -113,82 +118,4 @@ public class JdbcAsyncService implements AsyncService { public Task update(Task ref) { return taskDao.findById(ref.id); } - - class QueueThread implements Runnable { - public boolean shouldRun = true; - - public final Queue queue; - - private final AsyncCallable callable; - - QueueThread(Queue queue, AsyncCallable callable) { - this.queue = queue; - this.callable = callable; - } - - public void ping() { - log.info("Sending ping to " + queue); - synchronized (this) { - notify(); - } - } - - public void run() { - while (shouldRun) { - List tasks = taskDao.findByNameAndCompletedIsNull(queue.name); - - log.info("Found {} tasks on queue {}", tasks.size(), queue.name); - - try { - for (final Task task : tasks) { - try { - executeTask(task); - } catch (TransactionException | TaskFailureException e) { - log.warn("Task execution failed", e); - } - } - } catch (Exception e) { - log.warn("Error while executing tasks.", e); - } - - synchronized (this) { - try { - wait(); - } catch (InterruptedException e) { - // ignore - } - } - } - } - - private void executeTask(final Task task) { - final Date run = new Date(); - log.info("Setting last run on task. date = {}, task = {}", run, task); - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - protected void doInTransactionWithoutResult(TransactionStatus status) { - taskDao.update(task.registerRun()); - } - }); - - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - protected void doInTransactionWithoutResult(TransactionStatus status) { - try { - callable.run(); - Date completed = new Date(); - Task t = task.registerComplete(completed); - log.info("Completed task: {}", t); - taskDao.update(t); - } catch (Exception e) { - throw new TaskFailureException(e); - } - } - }); - } - } - - private static class TaskFailureException extends RuntimeException { - public TaskFailureException(Exception e) { - super(e); - } - } } -- cgit v1.2.3