From 637dddf11f5d60b35c9696914e1e2658b2ddc611 Mon Sep 17 00:00:00 2001
From: Trygve Laugstøl <trygvis@inamo.no>
Date: Sat, 20 Apr 2013 15:43:01 +0200
Subject: wip

---
 src/main/java/io/trygvis/Main.java                 |  45 ++--
 src/main/java/io/trygvis/data/QueueRepository.java |   2 +-
 src/main/java/io/trygvis/data/TaskRepository.java  |   4 +-
 src/main/java/io/trygvis/model/Queue.java          |  58 -----
 src/main/java/io/trygvis/model/Task.java           |  95 --------
 src/main/java/io/trygvis/queue/AsyncService.java   |  37 ++-
 .../java/io/trygvis/queue/JdbcAsyncService.java    | 194 ++++++++++++++++
 .../java/io/trygvis/queue/JpaAsyncService.java     | 257 ---------------------
 src/main/java/io/trygvis/queue/Queue.java          |  24 ++
 src/main/java/io/trygvis/queue/QueueDao.java       |  34 +++
 src/main/java/io/trygvis/queue/Task.java           |  55 +++++
 src/main/java/io/trygvis/queue/TaskDao.java        |  54 +++++
 src/main/java/io/trygvis/spring/Config.java        |  14 +-
 13 files changed, 419 insertions(+), 454 deletions(-)
 delete mode 100755 src/main/java/io/trygvis/model/Queue.java
 delete mode 100755 src/main/java/io/trygvis/model/Task.java
 create mode 100644 src/main/java/io/trygvis/queue/JdbcAsyncService.java
 delete mode 100755 src/main/java/io/trygvis/queue/JpaAsyncService.java
 create mode 100755 src/main/java/io/trygvis/queue/Queue.java
 create mode 100644 src/main/java/io/trygvis/queue/QueueDao.java
 create mode 100755 src/main/java/io/trygvis/queue/Task.java
 create mode 100644 src/main/java/io/trygvis/queue/TaskDao.java

(limited to 'src/main/java')

diff --git a/src/main/java/io/trygvis/Main.java b/src/main/java/io/trygvis/Main.java
index 0a31aaa..f2f540f 100755
--- a/src/main/java/io/trygvis/Main.java
+++ b/src/main/java/io/trygvis/Main.java
@@ -1,17 +1,21 @@
 package io.trygvis;
 
 import io.trygvis.queue.*;
+import io.trygvis.queue.Queue;
 import org.hibernate.dialect.*;
 import org.slf4j.*;
 import org.slf4j.bridge.*;
 import org.springframework.beans.factory.annotation.*;
 import org.springframework.context.support.*;
 import org.springframework.stereotype.*;
+import org.springframework.transaction.*;
+import org.springframework.transaction.support.*;
 
 import java.util.*;
 
 import static java.lang.System.*;
 import static java.lang.Thread.*;
+import static org.springframework.transaction.TransactionDefinition.PROPAGATION_REQUIRED;
 
 @Component
 public class Main {
@@ -48,7 +52,10 @@ public class Main {
     }
 
     @Autowired
-    private AsyncService<AsyncService.QueueRef, AsyncService.ExecutionRef> asyncService;
+    private TransactionTemplate transactionTemplate;
+
+    @Autowired
+    private AsyncService asyncService;
 
     @Autowired
     @Qualifier("createArticle")
@@ -61,35 +68,39 @@ public class Main {
     public void run() throws Exception {
         log.info("Main.run");
 
-        JpaAsyncService.JpaQueueRef queueRef = asyncService.registerQueue("create-queue", 10, createArticleCallable);
-        log.info("queue registered: ref = {}", queueRef);
+        final Queue q = asyncService.registerQueue("create-queue", 10, createArticleCallable);
+//        log.info("queue registered: ref = {}", q);
 //        asyncService.registerQueue("update-queue", 1, updateArticeCallable);
 
-        AsyncService.QueueRef queue = asyncService.getQueue("create-queue");
+//        q = asyncService.getQueue("create-queue");
 
-        List<AsyncService.ExecutionRef> refs = new ArrayList<>();
+        final List<Task> tasks = new ArrayList<>();
 
-        for (int i = 0; i < 10; i++) {
-            refs.add(asyncService.schedule(queue));
-        }
+        transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+            protected void doInTransactionWithoutResult(TransactionStatus status) {
+                for (int i = 0; i < 1; i++) {
+                    tasks.add(asyncService.schedule(q));
+                }
+            }
+        });
 
         while (true) {
-            log.info("size = {}", refs.size());
-            for (Iterator<AsyncService.ExecutionRef> iterator = refs.iterator(); iterator.hasNext(); ) {
-                AsyncService.ExecutionRef ref = iterator.next();
+            sleep(10000);
+
+            log.info("tasks.size = {}", tasks.size());
+            for (Iterator<Task> iterator = tasks.iterator(); iterator.hasNext(); ) {
+                Task task = iterator.next();
 
-                ref = asyncService.update(ref);
+                task = asyncService.update(task);
 
-                log.info("ref = {}", ref);
+                log.info("task = {}", task);
 
-                if (ref.isDone()) {
+                if (task.isDone()) {
                     iterator.remove();
                 }
-
-                sleep(100);
             }
 
-            if (refs.isEmpty()) {
+            if (tasks.isEmpty()) {
                 break;
             }
         }
diff --git a/src/main/java/io/trygvis/data/QueueRepository.java b/src/main/java/io/trygvis/data/QueueRepository.java
index 143d747..47ed478 100755
--- a/src/main/java/io/trygvis/data/QueueRepository.java
+++ b/src/main/java/io/trygvis/data/QueueRepository.java
@@ -1,6 +1,6 @@
 package io.trygvis.data;
 
-import io.trygvis.model.*;
+import io.trygvis.queue.*;
 import org.springframework.data.jpa.repository.*;
 
 public interface QueueRepository extends JpaRepository<Queue, Long> {
diff --git a/src/main/java/io/trygvis/data/TaskRepository.java b/src/main/java/io/trygvis/data/TaskRepository.java
index e24d520..b0710e3 100755
--- a/src/main/java/io/trygvis/data/TaskRepository.java
+++ b/src/main/java/io/trygvis/data/TaskRepository.java
@@ -1,7 +1,7 @@
 package io.trygvis.data;
 
-import io.trygvis.model.*;
-import io.trygvis.model.Queue;
+import io.trygvis.queue.*;
+import io.trygvis.queue.Queue;
 import org.springframework.data.jpa.repository.*;
 
 import java.util.*;
diff --git a/src/main/java/io/trygvis/model/Queue.java b/src/main/java/io/trygvis/model/Queue.java
deleted file mode 100755
index aeec405..0000000
--- a/src/main/java/io/trygvis/model/Queue.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package io.trygvis.model;
-
-import javax.persistence.*;
-
-@Entity
-@Table(
-        uniqueConstraints = {
-                @UniqueConstraint(name = "uq_queue__name", columnNames = "name")
-        }
-)
-public class Queue {
-
-    @Id
-    @SequenceGenerator(name = "queue_seq", sequenceName = "queue_seq")
-    @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "queue_seq")
-    private Integer id;
-
-    private String name;
-
-    private long interval;
-
-    @SuppressWarnings("UnusedDeclaration")
-    private Queue() {
-    }
-
-    public Queue(String name, long interval) {
-        this.name = name;
-        this.interval = interval;
-    }
-
-    public Integer getId() {
-        return id;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public long getInterval() {
-        return interval;
-    }
-
-    public void setInterval(long interval) {
-        this.interval = interval;
-    }
-
-    public String toString() {
-        return "Queue{" +
-                "id=" + id +
-                ", name='" + name + '\'' +
-                ", interval=" + interval +
-                '}';
-    }
-}
diff --git a/src/main/java/io/trygvis/model/Task.java b/src/main/java/io/trygvis/model/Task.java
deleted file mode 100755
index 24ac83d..0000000
--- a/src/main/java/io/trygvis/model/Task.java
+++ /dev/null
@@ -1,95 +0,0 @@
-package io.trygvis.model;
-
-import javax.persistence.*;
-import java.util.*;
-import java.util.regex.*;
-
-@Entity
-public class Task {
-    @Id
-    @SequenceGenerator(name = "task_seq", sequenceName = "task_seq")
-    @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "task_seq")
-    private Long id;
-
-    @ManyToOne
-    private Queue queue;
-
-    private Date scheduled;
-
-    private Date lastRun;
-
-    private int runCount;
-
-    private Date completed;
-
-    private String arguments;
-
-    private static final Pattern pattern = Pattern.compile(" ");
-
-    @SuppressWarnings("UnusedDeclaration")
-    private Task() {
-    }
-
-    public Task(Queue queue, Date scheduled, String... arguments) {
-        this.queue = queue;
-        this.scheduled = scheduled;
-
-        StringBuilder builder = new StringBuilder(arguments.length * 100);
-        for (String argument : arguments) {
-            if (pattern.matcher(argument).matches()) {
-                throw new RuntimeException("Bad argument: '" + argument + "'.");
-            }
-            builder.append(argument).append(' ');
-        }
-        this.arguments = builder.toString();
-    }
-
-    public Long getId() {
-        return id;
-    }
-
-    public String[] getArguments() {
-        return arguments.split(" ");
-    }
-
-    public Date getScheduled() {
-        return scheduled;
-    }
-
-    public Date getLastRun() {
-        return lastRun;
-    }
-
-    public int getRunCount() {
-        return runCount;
-    }
-
-    public Date getCompleted() {
-        return completed;
-    }
-
-    public boolean isDone() {
-        return completed != null;
-    }
-
-    public void registerRun() {
-        lastRun = new Date();
-        runCount++;
-    }
-
-    public void registerComplete(Date completed) {
-        this.completed = completed;
-    }
-
-    public String toString() {
-        return "Task{" +
-                "id=" + id +
-                ", queue=" + queue +
-                ", scheduled=" + scheduled +
-                ", lastRun=" + lastRun +
-                ", runCount=" + runCount +
-                ", completed=" + completed +
-                ", arguments='" + arguments + '\'' +
-                '}';
-    }
-}
diff --git a/src/main/java/io/trygvis/queue/AsyncService.java b/src/main/java/io/trygvis/queue/AsyncService.java
index b08db1f..10f1b79 100755
--- a/src/main/java/io/trygvis/queue/AsyncService.java
+++ b/src/main/java/io/trygvis/queue/AsyncService.java
@@ -2,32 +2,25 @@ package io.trygvis.queue;
 
 import org.quartz.*;
 
-import java.util.*;
+public interface AsyncService {
 
-public interface AsyncService<QueueRef extends AsyncService.QueueRef, ExecutionRef extends AsyncService.ExecutionRef> {
+    /**
+     * @param name
+     * @param interval how often the queue should be polled for missed tasks in seconds.
+     * @param callable
+     * @return
+     * @throws SchedulerException
+     */
+    Queue registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException;
 
-    JpaAsyncService.JpaQueueRef registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException;
+    Queue getQueue(String name);
 
-    QueueRef getQueue(String name);
+    Task schedule(Queue queue, String... args);
 
-    ExecutionRef schedule(QueueRef queue, String... args);
-
-    ExecutionRef update(ExecutionRef ref);
-
-    interface QueueRef {
-    }
-
-    interface ExecutionRef {
-        List<String> getArguments();
-
-        Date getScheduled();
-
-        Date getLastRun();
-
-        Date getCompleted();
-
-        boolean isDone();
-    }
+    /**
+     * Polls for a new state of the execution.
+     */
+    Task update(Task ref);
 
     interface AsyncCallable {
         void run() throws Exception;
diff --git a/src/main/java/io/trygvis/queue/JdbcAsyncService.java b/src/main/java/io/trygvis/queue/JdbcAsyncService.java
new file mode 100644
index 0000000..1df0ab6
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/JdbcAsyncService.java
@@ -0,0 +1,194 @@
+package io.trygvis.queue;
+
+import org.quartz.*;
+import org.slf4j.*;
+import org.springframework.beans.factory.annotation.*;
+import org.springframework.stereotype.*;
+import org.springframework.transaction.*;
+import org.springframework.transaction.annotation.*;
+import org.springframework.transaction.support.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static java.util.Arrays.*;
+import static java.util.concurrent.TimeUnit.*;
+import static org.springframework.transaction.annotation.Propagation.*;
+import static org.springframework.transaction.support.TransactionSynchronizationManager.*;
+
+@Component
+public class JdbcAsyncService implements AsyncService {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory());
+
+    private final Map<String, QueueThread> queues = new HashMap<>();
+
+    @Autowired
+    private TransactionTemplate transactionTemplate;
+
+    @Autowired
+    private QueueDao queueDao;
+
+    @Autowired
+    private TaskDao taskDao;
+
+    @Transactional(propagation = REQUIRED)
+    public Queue registerQueue(final String name, final int interval, AsyncCallable callable) throws SchedulerException {
+        log.info("registerQueue: ENTER");
+
+        Queue q = queueDao.findByName(name);
+
+        log.info("q = {}", q);
+
+        final long interval_;
+        if (q == null) {
+            q = new Queue(name, interval * 1000);
+            queueDao.insert(q);
+            interval_ = interval;
+        } else {
+            // Found an existing queue. Use the Settings from the database.
+            interval_ = q.interval;
+        }
+
+        final QueueThread queueThread = new QueueThread(q, callable);
+        queues.put(name, queueThread);
+
+        registerSynchronization(new TransactionSynchronizationAdapter() {
+            public void afterCompletion(int status) {
+                log.info("status = {}", status);
+                if (status == TransactionSynchronization.STATUS_COMMITTED) {
+                    executor.scheduleAtFixedRate(new Runnable() {
+                        public void run() {
+                            queueThread.ping();
+                        }
+                    }, 1000, 1000 * interval_, MILLISECONDS);
+                    Thread thread = new Thread(queueThread, name);
+                    thread.setDaemon(true);
+                    thread.start();
+                }
+            }
+        });
+
+        log.info("registerQueue: LEAVE");
+        return q;
+    }
+
+    public Queue getQueue(String name) {
+        QueueThread queueThread = queues.get(name);
+
+        if (queueThread == null) {
+            throw new RuntimeException("No such queue: '" + name + "'.");
+        }
+
+        return queueThread.queue;
+    }
+
+    @Transactional(propagation = REQUIRED)
+    public Task schedule(Queue queue, String... args) {
+        log.info("schedule: ENTER");
+
+        Date scheduled = new Date();
+
+        StringBuilder arguments = new StringBuilder();
+        for (String arg : args) {
+            arguments.append(arg).append(' ');
+        }
+
+        long id = taskDao.insert(queue.name, scheduled, arguments.toString());
+        Task task = new Task(id, queue.name, scheduled, null, 0, null, asList(args));
+        log.info("task = {}", task);
+        queues.get(queue.name).ping();
+        try {
+            Thread.sleep(500);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+        log.info("schedule: LEAVE");
+        return task;
+    }
+
+    @Transactional(readOnly = true)
+    public Task update(Task ref) {
+        return taskDao.findById(ref.id);
+    }
+
+    class QueueThread implements Runnable {
+        public boolean shouldRun = true;
+
+        public final Queue queue;
+
+        private final AsyncCallable callable;
+
+        QueueThread(Queue queue, AsyncCallable callable) {
+            this.queue = queue;
+            this.callable = callable;
+        }
+
+        public void ping() {
+            log.info("Sending ping to " + queue);
+            synchronized (this) {
+                notify();
+            }
+        }
+
+        public void run() {
+            while (shouldRun) {
+                List<Task> tasks = taskDao.findByNameAndCompletedIsNull(queue.name);
+
+                log.info("Found {} tasks on queue {}", tasks.size(), queue.name);
+
+                try {
+                    for (final Task task : tasks) {
+                        try {
+                            executeTask(task);
+                        } catch (TransactionException | TaskFailureException e) {
+                            log.warn("Task execution failed", e);
+                        }
+                    }
+                } catch (Exception e) {
+                    log.warn("Error while executing tasks.", e);
+                }
+
+                synchronized (this) {
+                    try {
+                        wait();
+                    } catch (InterruptedException e) {
+                        // ignore
+                    }
+                }
+            }
+        }
+
+        private void executeTask(final Task task) {
+            final Date run = new Date();
+            log.info("Setting last run on task. date = {}, task = {}", run, task);
+            transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+                protected void doInTransactionWithoutResult(TransactionStatus status) {
+                    taskDao.update(task.registerRun());
+                }
+            });
+
+            transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+                protected void doInTransactionWithoutResult(TransactionStatus status) {
+                    try {
+                        callable.run();
+                        Date completed = new Date();
+                        Task t = task.registerComplete(completed);
+                        log.info("Completed task: {}", t);
+                        taskDao.update(t);
+                    } catch (Exception e) {
+                        throw new TaskFailureException(e);
+                    }
+                }
+            });
+        }
+    }
+
+    private static class TaskFailureException extends RuntimeException {
+        public TaskFailureException(Exception e) {
+            super(e);
+        }
+    }
+}
diff --git a/src/main/java/io/trygvis/queue/JpaAsyncService.java b/src/main/java/io/trygvis/queue/JpaAsyncService.java
deleted file mode 100755
index e715ac7..0000000
--- a/src/main/java/io/trygvis/queue/JpaAsyncService.java
+++ /dev/null
@@ -1,257 +0,0 @@
-package io.trygvis.queue;
-
-import io.trygvis.data.*;
-import io.trygvis.model.Queue;
-import io.trygvis.model.*;
-import org.quartz.*;
-import org.slf4j.*;
-import org.springframework.beans.factory.annotation.*;
-import org.springframework.stereotype.*;
-import org.springframework.transaction.*;
-import org.springframework.transaction.annotation.*;
-import org.springframework.transaction.support.*;
-
-import javax.persistence.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-import static java.util.concurrent.TimeUnit.*;
-import static org.springframework.transaction.support.TransactionSynchronizationManager.*;
-
-@SuppressWarnings("SpringJavaAutowiringInspection")
-@Component
-public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef, JpaAsyncService.JpaExecutionRef>/*, ApplicationContextAware*/ {
-
-    private final Logger log = LoggerFactory.getLogger(getClass());
-
-    private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory());
-
-    @PersistenceContext
-    private EntityManager entityManager;
-
-    @Autowired
-    private TransactionTemplate transactionTemplate;
-
-    @Autowired
-    private QueueRepository queueRepository;
-
-    @Autowired
-    private TaskRepository taskRepository;
-
-    @Transactional
-    public synchronized JpaQueueRef registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException {
-        log.info("registerQueue: ENTER");
-
-        Queue q = queueRepository.findByName(name);
-
-        log.info("q = {}", q);
-
-        if (q == null) {
-            q = new Queue(name, interval);
-            q = queueRepository.save(q);
-        } else {
-            boolean dirty = false;
-            if (interval != q.getInterval()) {
-                q.setInterval(interval);
-                dirty = true;
-            }
-
-            if (dirty) {
-                q = queueRepository.save(q);
-            }
-        }
-
-        log.info("q = {}", q);
-        entityManager.flush();
-//        entityManager.detach(q);
-        log.info("q = {}", q);
-
-        JpaQueueRef queueRef = new JpaQueueRef(q);
-
-        log.info("registerQueue: LEAVE");
-
-        registerSynchronization(new MyTransactionSynchronization(callable, interval, queueRef));
-
-        return queueRef;
-    }
-
-    @Transactional(readOnly = true)
-    public JpaQueueRef getQueue(String name) {
-        Queue queue = queueRepository.findByName(name);
-
-        if (queue == null) {
-            throw new RollbackException("No such queue: '" + name + "'.");
-        }
-
-        entityManager.detach(queue);
-
-        return new JpaQueueRef(queue);
-    }
-
-    @Transactional
-    public JpaExecutionRef schedule(JpaQueueRef queue, String... args) {
-        log.info("schedule: ENTER");
-        Date scheduled = new Date();
-        Task task = new Task(queue.queue, scheduled, args);
-        log.info("task = {}", task);
-        taskRepository.save(task);
-        log.info("task = {}", task);
-//        entityManager.detach(task);
-        log.info("schedule: LEAVE");
-        return new JpaExecutionRef(task);
-    }
-
-    @Transactional(readOnly = true)
-    public JpaExecutionRef update(JpaExecutionRef ref) {
-        return new JpaExecutionRef(taskRepository.findOne(ref.task.getId()));
-    }
-
-    public static class JpaQueueRef implements AsyncService.QueueRef {
-        public final Queue queue;
-
-        JpaQueueRef(Queue queue) {
-            this.queue = queue;
-        }
-
-        public String toString() {
-            return "JpaQueueRef{" +
-                    "queue=" + queue +
-                    '}';
-        }
-    }
-
-    public static class JpaExecutionRef implements AsyncService.ExecutionRef {
-        private final Task task;
-
-        public JpaExecutionRef(Task task) {
-            this.task = task;
-        }
-
-        public List<String> getArguments() {
-            return Arrays.asList(task.getArguments());
-        }
-
-        public Date getScheduled() {
-            return task.getScheduled();
-        }
-
-        public Date getLastRun() {
-            return task.getLastRun();
-        }
-
-        public Date getCompleted() {
-            return task.getCompleted();
-        }
-
-        public boolean isDone() {
-            return task.isDone();
-        }
-
-        public String toString() {
-            return "JpaExecutionRef{" +
-                    "task=" + task +
-                    '}';
-        }
-    }
-
-    private static class TaskFailureException extends RuntimeException {
-        public TaskFailureException(Exception e) {
-            super(e);
-        }
-    }
-
-    private class CheckTimerTask implements Runnable {
-        private final AsyncCallable callable;
-        private final JpaQueueRef queueRef;
-
-        private CheckTimerTask(AsyncCallable callable, JpaQueueRef queueRef) {
-            this.callable = callable;
-            this.queueRef = queueRef;
-        }
-
-        public void run() {
-            log.info("JpaAsyncService$CheckTimerTask.run");
-
-            List<Task> tasks = taskRepository.findByQueueAndCompletedIsNull(queueRef.queue);
-
-            System.out.println("tasks.size() = " + tasks.size());
-
-            try {
-                for (final Task task : tasks) {
-                    try {
-                        executeTask(task);
-                    } catch (TransactionException | TaskFailureException e) {
-                        log.warn("Task execution failed", e);
-                    }
-                }
-            } catch (Exception e) {
-                log.warn("Error while executing tasks.", e);
-            }
-        }
-
-        private void executeTask(final Task task) {
-            final Date run = new Date();
-            log.info("Setting last run on task. date = {}, task = {}", run, task);
-            transactionTemplate.execute(new TransactionCallbackWithoutResult() {
-                protected void doInTransactionWithoutResult(TransactionStatus status) {
-                    task.registerRun();
-                    taskRepository.save(task);
-                }
-            });
-
-            transactionTemplate.execute(new TransactionCallbackWithoutResult() {
-                protected void doInTransactionWithoutResult(TransactionStatus status) {
-                    try {
-                        callable.run();
-                        Date completed = new Date();
-                        log.info("Setting completed on task. date = {}, task = {}", completed, task);
-                        task.registerComplete(completed);
-                        taskRepository.save(task);
-                    } catch (Exception e) {
-                        throw new TaskFailureException(e);
-                    }
-                }
-            });
-        }
-    }
-
-    private class MyTransactionSynchronization implements TransactionSynchronization {
-
-        private final AsyncCallable callable;
-
-        private final int interval;
-
-        private final JpaQueueRef queueRef;
-
-        public MyTransactionSynchronization(AsyncCallable callable, int interval, JpaQueueRef queueRef) {
-            this.callable = callable;
-            this.interval = interval;
-            this.queueRef = queueRef;
-        }
-
-        public void suspend() {
-        }
-
-        public void resume() {
-        }
-
-        public void flush() {
-        }
-
-        public void beforeCommit(boolean readOnly) {
-        }
-
-        public void beforeCompletion() {
-        }
-
-        public void afterCommit() {
-        }
-
-        public void afterCompletion(int status) {
-            log.info("status = {}", status);
-            if (status == TransactionSynchronization.STATUS_COMMITTED) {
-                executor.scheduleAtFixedRate(new CheckTimerTask(callable, queueRef), 1000, 1000 * interval, MILLISECONDS);
-            }
-        }
-    }
-}
diff --git a/src/main/java/io/trygvis/queue/Queue.java b/src/main/java/io/trygvis/queue/Queue.java
new file mode 100755
index 0000000..15003f7
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/Queue.java
@@ -0,0 +1,24 @@
+package io.trygvis.queue;
+
+public class Queue {
+
+    public final String name;
+
+    public final long interval;
+
+    public Queue(String name, long interval) {
+        this.name = name;
+        this.interval = interval;
+    }
+
+    public Queue withInterval(int interval) {
+        return new Queue(name, interval);
+    }
+
+    public String toString() {
+        return "Queue{" +
+                "name='" + name + '\'' +
+                ", interval=" + interval +
+                '}';
+    }
+}
diff --git a/src/main/java/io/trygvis/queue/QueueDao.java b/src/main/java/io/trygvis/queue/QueueDao.java
new file mode 100644
index 0000000..a3a79ca
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/QueueDao.java
@@ -0,0 +1,34 @@
+package io.trygvis.queue;
+
+import org.springframework.beans.factory.annotation.*;
+import org.springframework.jdbc.core.*;
+import org.springframework.stereotype.*;
+
+import java.sql.*;
+
+import static org.springframework.dao.support.DataAccessUtils.*;
+
+@Component
+public class QueueDao {
+
+    @Autowired
+    private JdbcTemplate jdbcTemplate;
+
+    public Queue findByName(String name) {
+        return singleResult(jdbcTemplate.query("SELECT name, interval FROM queue WHERE name=?", new QueueRowMapper(), name));
+    }
+
+    public void insert(Queue q) {
+        jdbcTemplate.update("INSERT INTO queue(name, interval) VALUES(?, ?)", q.name, q.interval);
+    }
+
+    public void update(Queue q) {
+        jdbcTemplate.update("UPDATE queue SET interval=? WHERE name=?", q.interval, q.name);
+    }
+
+    private class QueueRowMapper implements RowMapper<Queue> {
+        public Queue mapRow(ResultSet rs, int rowNum) throws SQLException {
+            return new Queue(rs.getString(1), rs.getLong(2));
+        }
+    }
+}
diff --git a/src/main/java/io/trygvis/queue/Task.java b/src/main/java/io/trygvis/queue/Task.java
new file mode 100755
index 0000000..7f64c77
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/Task.java
@@ -0,0 +1,55 @@
+package io.trygvis.queue;
+
+import java.util.*;
+
+public class Task {
+
+    public final long id;
+
+    public final String queue;
+
+    public final Date scheduled;
+
+    public final Date lastRun;
+
+    public final int runCount;
+
+    public final Date completed;
+
+    public final List<String> arguments;
+
+    Task(long id, String queue, Date scheduled, Date lastRun, int runCount, Date completed, List<String> arguments) {
+        this.id = id;
+        this.queue = queue;
+        this.scheduled = scheduled;
+        this.lastRun = lastRun;
+        this.runCount = runCount;
+        this.completed = completed;
+
+        this.arguments = arguments;
+    }
+
+    public Task registerRun() {
+        return new Task(id, queue, scheduled, new Date(), runCount + 1, completed, arguments);
+    }
+
+    public Task registerComplete(Date completed) {
+        return new Task(id, queue, scheduled, lastRun, runCount, new Date(), arguments);
+    }
+
+    public String toString() {
+        return "Task{" +
+                "id=" + id +
+                ", queue=" + queue +
+                ", scheduled=" + scheduled +
+                ", lastRun=" + lastRun +
+                ", runCount=" + runCount +
+                ", completed=" + completed +
+                ", arguments='" + arguments + '\'' +
+                '}';
+    }
+
+    public boolean isDone() {
+        return completed != null;
+    }
+}
diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java
new file mode 100644
index 0000000..2e407a5
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/TaskDao.java
@@ -0,0 +1,54 @@
+package io.trygvis.queue;
+
+import org.springframework.beans.factory.annotation.*;
+import org.springframework.jdbc.core.*;
+import org.springframework.stereotype.*;
+
+import java.sql.*;
+import java.util.Date;
+import java.util.*;
+
+import static java.util.Arrays.*;
+
+@Component
+public class TaskDao {
+
+    @Autowired
+    private JdbcTemplate jdbcTemplate;
+
+    public long insert(String queue, Date scheduled, String arguments) {
+        jdbcTemplate.update("INSERT INTO task(id, run_count, queue, scheduled, arguments) " +
+                "VALUES(nextval('task_seq'), 0, ?, ?, ?)", queue, scheduled, arguments);
+        return jdbcTemplate.queryForObject("SELECT currval('task_seq')", Long.class);
+    }
+
+    public Task findById(long id) {
+        return jdbcTemplate.queryForObject("SELECT " + TaskRowMapper.fields + " FROM task WHERE id=?",
+                new TaskRowMapper(), id);
+    }
+
+    public List<Task> findByNameAndCompletedIsNull(String name) {
+        return jdbcTemplate.query("SELECT " + TaskRowMapper.fields + " FROM task WHERE queue=? AND completed IS NULL",
+                new TaskRowMapper(), name);
+    }
+
+    public void update(Task task) {
+        jdbcTemplate.update("UPDATE task SET scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?",
+                task.scheduled, task.lastRun, task.runCount, task.completed, task.id);
+    }
+
+    private class TaskRowMapper implements RowMapper<Task> {
+        public static final String fields = "id, queue, scheduled, last_run, run_count, completed, arguments";
+
+        public Task mapRow(ResultSet rs, int rowNum) throws SQLException {
+            return new Task(
+                    rs.getLong(1),
+                    rs.getString(2),
+                    rs.getTimestamp(3),
+                    rs.getTimestamp(4),
+                    rs.getInt(5),
+                    rs.getTimestamp(6),
+                    asList(rs.getString(7).split(" ")));
+        }
+    }
+}
diff --git a/src/main/java/io/trygvis/spring/Config.java b/src/main/java/io/trygvis/spring/Config.java
index 5df4dac..5dd845f 100755
--- a/src/main/java/io/trygvis/spring/Config.java
+++ b/src/main/java/io/trygvis/spring/Config.java
@@ -11,6 +11,7 @@ import org.springframework.context.annotation.*;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.support.*;
 import org.springframework.data.jpa.repository.config.*;
+import org.springframework.jdbc.core.*;
 import org.springframework.jdbc.datasource.*;
 import org.springframework.orm.hibernate4.*;
 import org.springframework.orm.jpa.*;
@@ -18,12 +19,13 @@ import org.springframework.transaction.*;
 import org.springframework.transaction.annotation.*;
 import org.springframework.transaction.support.*;
 
-import java.util.*;
 import javax.persistence.*;
 import javax.sql.*;
+import java.util.*;
 
 import static org.hibernate.cfg.AvailableSettings.*;
 import static org.hibernate.ejb.AvailableSettings.*;
+import static org.springframework.transaction.TransactionDefinition.*;
 
 @Configuration
 @ComponentScan(basePackages = "io.trygvis")
@@ -40,6 +42,11 @@ public class Config {
         }};
     }
 
+    @Bean
+    public JdbcTemplate jdbcTemplate(DataSource dataSource) {
+        return new JdbcTemplate(dataSource);
+    }
+
 //    public SpringBeanJobFactory springBeanJobFactory() {
 //        SpringBeanJobFactory factory = new SpringBeanJobFactory();
 //        return factory;
@@ -164,6 +171,9 @@ public class Config {
 
     @Bean
     public TransactionTemplate transactionTemplate(PlatformTransactionManager platformTransactionManager) {
-        return new TransactionTemplate(platformTransactionManager);
+        DefaultTransactionDefinition td = new DefaultTransactionDefinition();
+        td.setPropagationBehavior(PROPAGATION_REQUIRED);
+        td.setIsolationLevel(ISOLATION_READ_COMMITTED);
+        return new TransactionTemplate(platformTransactionManager, td);
     }
 }
-- 
cgit v1.2.3