diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2013-06-04 20:54:56 +0200 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2013-06-04 20:54:56 +0200 |
commit | 7465fdb9aa847d29dacc56adbe473f1c1ceb298e (patch) | |
tree | d04b5c859fc090a57355e7bc0e51a043cddc907b /src/main/java/io/trygvis/async | |
parent | 1eeef021c65c85c24d62a0cc1ee4a746a601beb5 (diff) | |
download | quartz-based-queue-7465fdb9aa847d29dacc56adbe473f1c1ceb298e.tar.gz quartz-based-queue-7465fdb9aa847d29dacc56adbe473f1c1ceb298e.tar.bz2 quartz-based-queue-7465fdb9aa847d29dacc56adbe473f1c1ceb298e.tar.xz quartz-based-queue-7465fdb9aa847d29dacc56adbe473f1c1ceb298e.zip |
o Creating a QueueService on top of the DAOs.
Diffstat (limited to 'src/main/java/io/trygvis/async')
3 files changed, 13 insertions, 139 deletions
diff --git a/src/main/java/io/trygvis/async/AsyncService.java b/src/main/java/io/trygvis/async/AsyncService.java index 57c1af8..17d53e9 100755 --- a/src/main/java/io/trygvis/async/AsyncService.java +++ b/src/main/java/io/trygvis/async/AsyncService.java @@ -3,7 +3,6 @@ package io.trygvis.async; import io.trygvis.queue.Queue; import io.trygvis.queue.Task; -import java.sql.SQLException; import java.util.List; /** @@ -11,19 +10,13 @@ import java.util.List; */ public interface AsyncService { - /** - * @param name - * @param interval how often the queue should be polled for missed tasks in seconds. - * @param callable - * @return - */ - Queue registerQueue(final String name, final int interval, AsyncCallable callable); + void registerQueue(Queue queue, final AsyncService.AsyncCallable callable); Queue getQueue(String name); - Task schedule(Queue queue, String... args); + Task schedule(Queue queue, List<String> args); - Task schedule(long parent, Queue queue, String... args); + Task schedule(long parent, Queue queue, List<String> args); /** * Polls for a new state of the execution. diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java index c34330e..310c59b 100644 --- a/src/main/java/io/trygvis/async/JdbcAsyncService.java +++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java @@ -1,7 +1,6 @@ package io.trygvis.async; import io.trygvis.queue.Queue; -import io.trygvis.queue.QueueDao; import io.trygvis.queue.Task; import io.trygvis.queue.TaskDao; import org.slf4j.Logger; @@ -11,12 +10,12 @@ import java.sql.Connection; import java.sql.SQLException; import java.util.Date; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledThreadPoolExecutor; import static java.lang.System.currentTimeMillis; import static java.lang.Thread.sleep; -import static java.util.Arrays.asList; import static java.util.concurrent.TimeUnit.SECONDS; public class JdbcAsyncService { @@ -24,25 +23,12 @@ public class JdbcAsyncService { private final Map<String, QueueThread> queues = new HashMap<>(); - public Queue registerQueue(Connection c, SqlEffectExecutor sqlEffectExecutor, final String name, final int interval, AsyncService.AsyncCallable callable) throws SQLException { - QueueDao queueDao = new QueueDao(c); + public void registerQueue(SqlEffectExecutor sqlEffectExecutor, Queue queue, AsyncService.AsyncCallable callable) { + final QueueThread queueThread = new QueueThread(sqlEffectExecutor, callable, queue); - log.info("registerQueue: ENTER"); - - Queue q = queueDao.findByName(name); - - log.info("q = {}", q); - - if (q == null) { - q = new Queue(name, interval); - queueDao.insert(q); - } - - final QueueThread queueThread = new QueueThread(sqlEffectExecutor, callable, q); - queues.put(name, queueThread); + queues.put(queue.name, queueThread); log.info("registerQueue: LEAVE"); - return q; } public void startQueue(ScheduledThreadPoolExecutor executor, String name) { @@ -74,26 +60,21 @@ public class JdbcAsyncService { return queueThread.queue; } - public Task schedule(Connection c, final Queue queue, String... args) throws SQLException { + public Task schedule(Connection c, final Queue queue, List<String> args) throws SQLException { return scheduleInner(c, null, queue, args); } - public Task schedule(Connection c, long parent, Queue queue, String... args) throws SQLException { + public Task schedule(Connection c, long parent, Queue queue, List<String> args) throws SQLException { return scheduleInner(c, parent, queue, args); } - private Task scheduleInner(Connection c, Long parent, final Queue queue, String... args) throws SQLException { + private Task scheduleInner(Connection c, Long parent, final Queue queue, List<String> args) throws SQLException { TaskDao taskDao = new TaskDao(c); Date scheduled = new Date(); - StringBuilder arguments = new StringBuilder(); - for (String arg : args) { - arguments.append(arg).append(' '); - } - - long id = taskDao.insert(parent, queue.name, scheduled, arguments.toString()); - Task task = new Task(id, parent, queue.name, scheduled, null, 0, null, asList(args)); + long id = taskDao.insert(parent, queue.name, scheduled, args); + Task task = new Task(id, parent, queue.name, scheduled, null, 0, null, args); log.info("Created task = {}", task); return task; @@ -123,6 +104,6 @@ public class JdbcAsyncService { public Task update(Connection c, Task ref) throws SQLException { TaskDao taskDao = new TaskDao(c); - return taskDao.findById(ref.id); + return taskDao.findById(ref.id()); } } diff --git a/src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java b/src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java deleted file mode 100644 index 327dffa..0000000 --- a/src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java +++ /dev/null @@ -1,100 +0,0 @@ -package io.trygvis.async.spring; - -import io.trygvis.async.AsyncService; -import io.trygvis.async.JdbcAsyncService; -import io.trygvis.async.SqlEffectExecutor; -import io.trygvis.queue.Queue; -import io.trygvis.queue.Task; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.jdbc.core.ConnectionCallback; -import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.transaction.annotation.Transactional; -import org.springframework.transaction.support.TransactionSynchronization; -import org.springframework.transaction.support.TransactionSynchronizationAdapter; -import org.springframework.transaction.support.TransactionTemplate; - -import java.sql.Connection; -import java.sql.SQLException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledThreadPoolExecutor; - -import static org.springframework.transaction.annotation.Propagation.REQUIRED; -import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization; - -public class SpringJdbcAsyncService implements AsyncService { - private final Logger log = LoggerFactory.getLogger(getClass()); - - private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); - - private final TransactionTemplate transactionTemplate; - - private final JdbcTemplate jdbcTemplate; - - private SqlEffectExecutor sqlEffectExecutor; - - final JdbcAsyncService jdbcAsyncService; - - public SpringJdbcAsyncService(TransactionTemplate transactionTemplate, JdbcTemplate jdbcTemplate) { - this.transactionTemplate = transactionTemplate; - this.jdbcTemplate = jdbcTemplate; - jdbcAsyncService = new JdbcAsyncService(); - sqlEffectExecutor = new SqlEffectExecutor(this.jdbcTemplate.getDataSource()); - } - - @Transactional(propagation = REQUIRED) - public Queue registerQueue(final String name, final int interval, final AsyncService.AsyncCallable callable) { - return jdbcTemplate.execute(new ConnectionCallback<Queue>() { - @Override - public Queue doInConnection(Connection c) throws SQLException { - - Queue q = jdbcAsyncService.registerQueue(c, sqlEffectExecutor, name, interval, callable); - - registerSynchronization(new TransactionSynchronizationAdapter() { - public void afterCompletion(int status) { - log.info("Transaction completed with status = {}", status); - if (status == TransactionSynchronization.STATUS_COMMITTED) { - jdbcAsyncService.startQueue(executor, name); - } - } - }); - - log.info("registerQueue: LEAVE"); - return q; - } - }); - } - - public Queue getQueue(String name) { - return jdbcAsyncService.getQueue(name); - } - - @Transactional(propagation = REQUIRED) - public Task schedule(final Queue queue, final String... args) { - return jdbcTemplate.execute(new ConnectionCallback<Task>() { - @Override - public Task doInConnection(Connection c) throws SQLException { - return jdbcAsyncService.schedule(c, queue, args); - } - }); - } - - public Task schedule(final long parent, final Queue queue, final String... args) { - return jdbcTemplate.execute(new ConnectionCallback<Task>() { - @Override - public Task doInConnection(Connection c) throws SQLException { - return jdbcAsyncService.schedule(c, parent, queue, args); - } - }); - } - - @Transactional(readOnly = true) - public Task update(final Task ref) { - return jdbcTemplate.execute(new ConnectionCallback<Task>() { - @Override - public Task doInConnection(Connection c) throws SQLException { - return jdbcAsyncService.update(c, ref); - } - }); - } -} |