aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/queue/JdbcAsyncService.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/queue/JdbcAsyncService.java')
-rw-r--r--src/main/java/io/trygvis/queue/JdbcAsyncService.java115
1 files changed, 21 insertions, 94 deletions
diff --git a/src/main/java/io/trygvis/queue/JdbcAsyncService.java b/src/main/java/io/trygvis/queue/JdbcAsyncService.java
index 1df0ab6..a8f581e 100644
--- a/src/main/java/io/trygvis/queue/JdbcAsyncService.java
+++ b/src/main/java/io/trygvis/queue/JdbcAsyncService.java
@@ -4,7 +4,6 @@ import org.quartz.*;
import org.slf4j.*;
import org.springframework.beans.factory.annotation.*;
import org.springframework.stereotype.*;
-import org.springframework.transaction.*;
import org.springframework.transaction.annotation.*;
import org.springframework.transaction.support.*;
@@ -43,7 +42,7 @@ public class JdbcAsyncService implements AsyncService {
final long interval_;
if (q == null) {
- q = new Queue(name, interval * 1000);
+ q = new Queue(name, interval);
queueDao.insert(q);
interval_ = interval;
} else {
@@ -51,18 +50,19 @@ public class JdbcAsyncService implements AsyncService {
interval_ = q.interval;
}
- final QueueThread queueThread = new QueueThread(q, callable);
+ final QueueThread queueThread = new QueueThread(q, taskDao, transactionTemplate, callable);
queues.put(name, queueThread);
registerSynchronization(new TransactionSynchronizationAdapter() {
public void afterCompletion(int status) {
- log.info("status = {}", status);
+ log.info("Transaction completed with status = {}", status);
if (status == TransactionSynchronization.STATUS_COMMITTED) {
+ log.info("Starting thread for queue {} with poll interval = {}s", name, interval);
executor.scheduleAtFixedRate(new Runnable() {
public void run() {
queueThread.ping();
}
- }, 1000, 1000 * interval_, MILLISECONDS);
+ }, 10, interval_, SECONDS);
Thread thread = new Thread(queueThread, name);
thread.setDaemon(true);
thread.start();
@@ -85,9 +85,7 @@ public class JdbcAsyncService implements AsyncService {
}
@Transactional(propagation = REQUIRED)
- public Task schedule(Queue queue, String... args) {
- log.info("schedule: ENTER");
-
+ public Task schedule(final Queue queue, String... args) {
Date scheduled = new Date();
StringBuilder arguments = new StringBuilder();
@@ -97,15 +95,22 @@ public class JdbcAsyncService implements AsyncService {
long id = taskDao.insert(queue.name, scheduled, arguments.toString());
Task task = new Task(id, queue.name, scheduled, null, 0, null, asList(args));
- log.info("task = {}", task);
- queues.get(queue.name).ping();
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
+ log.info("Created task = {}", task);
+// queues.get(queue.name).ping();
+// try {
+// Thread.sleep(500);
+// } catch (InterruptedException e) {
+// e.printStackTrace();
+// }
+
+ registerSynchronization(new TransactionSynchronizationAdapter() {
+ public void afterCompletion(int status) {
+ if (status == TransactionSynchronization.STATUS_COMMITTED) {
+ queues.get(queue.name).ping();
+ }
+ }
+ });
- log.info("schedule: LEAVE");
return task;
}
@@ -113,82 +118,4 @@ public class JdbcAsyncService implements AsyncService {
public Task update(Task ref) {
return taskDao.findById(ref.id);
}
-
- class QueueThread implements Runnable {
- public boolean shouldRun = true;
-
- public final Queue queue;
-
- private final AsyncCallable callable;
-
- QueueThread(Queue queue, AsyncCallable callable) {
- this.queue = queue;
- this.callable = callable;
- }
-
- public void ping() {
- log.info("Sending ping to " + queue);
- synchronized (this) {
- notify();
- }
- }
-
- public void run() {
- while (shouldRun) {
- List<Task> tasks = taskDao.findByNameAndCompletedIsNull(queue.name);
-
- log.info("Found {} tasks on queue {}", tasks.size(), queue.name);
-
- try {
- for (final Task task : tasks) {
- try {
- executeTask(task);
- } catch (TransactionException | TaskFailureException e) {
- log.warn("Task execution failed", e);
- }
- }
- } catch (Exception e) {
- log.warn("Error while executing tasks.", e);
- }
-
- synchronized (this) {
- try {
- wait();
- } catch (InterruptedException e) {
- // ignore
- }
- }
- }
- }
-
- private void executeTask(final Task task) {
- final Date run = new Date();
- log.info("Setting last run on task. date = {}, task = {}", run, task);
- transactionTemplate.execute(new TransactionCallbackWithoutResult() {
- protected void doInTransactionWithoutResult(TransactionStatus status) {
- taskDao.update(task.registerRun());
- }
- });
-
- transactionTemplate.execute(new TransactionCallbackWithoutResult() {
- protected void doInTransactionWithoutResult(TransactionStatus status) {
- try {
- callable.run();
- Date completed = new Date();
- Task t = task.registerComplete(completed);
- log.info("Completed task: {}", t);
- taskDao.update(t);
- } catch (Exception e) {
- throw new TaskFailureException(e);
- }
- }
- });
- }
- }
-
- private static class TaskFailureException extends RuntimeException {
- public TaskFailureException(Exception e) {
- super(e);
- }
- }
}