aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/async
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2013-06-04 20:54:56 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2013-06-04 20:54:56 +0200
commit7465fdb9aa847d29dacc56adbe473f1c1ceb298e (patch)
treed04b5c859fc090a57355e7bc0e51a043cddc907b /src/main/java/io/trygvis/async
parent1eeef021c65c85c24d62a0cc1ee4a746a601beb5 (diff)
downloadquartz-based-queue-7465fdb9aa847d29dacc56adbe473f1c1ceb298e.tar.gz
quartz-based-queue-7465fdb9aa847d29dacc56adbe473f1c1ceb298e.tar.bz2
quartz-based-queue-7465fdb9aa847d29dacc56adbe473f1c1ceb298e.tar.xz
quartz-based-queue-7465fdb9aa847d29dacc56adbe473f1c1ceb298e.zip
o Creating a QueueService on top of the DAOs.
Diffstat (limited to 'src/main/java/io/trygvis/async')
-rwxr-xr-xsrc/main/java/io/trygvis/async/AsyncService.java13
-rw-r--r--src/main/java/io/trygvis/async/JdbcAsyncService.java39
-rw-r--r--src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java100
3 files changed, 13 insertions, 139 deletions
diff --git a/src/main/java/io/trygvis/async/AsyncService.java b/src/main/java/io/trygvis/async/AsyncService.java
index 57c1af8..17d53e9 100755
--- a/src/main/java/io/trygvis/async/AsyncService.java
+++ b/src/main/java/io/trygvis/async/AsyncService.java
@@ -3,7 +3,6 @@ package io.trygvis.async;
import io.trygvis.queue.Queue;
import io.trygvis.queue.Task;
-import java.sql.SQLException;
import java.util.List;
/**
@@ -11,19 +10,13 @@ import java.util.List;
*/
public interface AsyncService {
- /**
- * @param name
- * @param interval how often the queue should be polled for missed tasks in seconds.
- * @param callable
- * @return
- */
- Queue registerQueue(final String name, final int interval, AsyncCallable callable);
+ void registerQueue(Queue queue, final AsyncService.AsyncCallable callable);
Queue getQueue(String name);
- Task schedule(Queue queue, String... args);
+ Task schedule(Queue queue, List<String> args);
- Task schedule(long parent, Queue queue, String... args);
+ Task schedule(long parent, Queue queue, List<String> args);
/**
* Polls for a new state of the execution.
diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java
index c34330e..310c59b 100644
--- a/src/main/java/io/trygvis/async/JdbcAsyncService.java
+++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java
@@ -1,7 +1,6 @@
package io.trygvis.async;
import io.trygvis.queue.Queue;
-import io.trygvis.queue.QueueDao;
import io.trygvis.queue.Task;
import io.trygvis.queue.TaskDao;
import org.slf4j.Logger;
@@ -11,12 +10,12 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.util.Date;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.sleep;
-import static java.util.Arrays.asList;
import static java.util.concurrent.TimeUnit.SECONDS;
public class JdbcAsyncService {
@@ -24,25 +23,12 @@ public class JdbcAsyncService {
private final Map<String, QueueThread> queues = new HashMap<>();
- public Queue registerQueue(Connection c, SqlEffectExecutor sqlEffectExecutor, final String name, final int interval, AsyncService.AsyncCallable callable) throws SQLException {
- QueueDao queueDao = new QueueDao(c);
+ public void registerQueue(SqlEffectExecutor sqlEffectExecutor, Queue queue, AsyncService.AsyncCallable callable) {
+ final QueueThread queueThread = new QueueThread(sqlEffectExecutor, callable, queue);
- log.info("registerQueue: ENTER");
-
- Queue q = queueDao.findByName(name);
-
- log.info("q = {}", q);
-
- if (q == null) {
- q = new Queue(name, interval);
- queueDao.insert(q);
- }
-
- final QueueThread queueThread = new QueueThread(sqlEffectExecutor, callable, q);
- queues.put(name, queueThread);
+ queues.put(queue.name, queueThread);
log.info("registerQueue: LEAVE");
- return q;
}
public void startQueue(ScheduledThreadPoolExecutor executor, String name) {
@@ -74,26 +60,21 @@ public class JdbcAsyncService {
return queueThread.queue;
}
- public Task schedule(Connection c, final Queue queue, String... args) throws SQLException {
+ public Task schedule(Connection c, final Queue queue, List<String> args) throws SQLException {
return scheduleInner(c, null, queue, args);
}
- public Task schedule(Connection c, long parent, Queue queue, String... args) throws SQLException {
+ public Task schedule(Connection c, long parent, Queue queue, List<String> args) throws SQLException {
return scheduleInner(c, parent, queue, args);
}
- private Task scheduleInner(Connection c, Long parent, final Queue queue, String... args) throws SQLException {
+ private Task scheduleInner(Connection c, Long parent, final Queue queue, List<String> args) throws SQLException {
TaskDao taskDao = new TaskDao(c);
Date scheduled = new Date();
- StringBuilder arguments = new StringBuilder();
- for (String arg : args) {
- arguments.append(arg).append(' ');
- }
-
- long id = taskDao.insert(parent, queue.name, scheduled, arguments.toString());
- Task task = new Task(id, parent, queue.name, scheduled, null, 0, null, asList(args));
+ long id = taskDao.insert(parent, queue.name, scheduled, args);
+ Task task = new Task(id, parent, queue.name, scheduled, null, 0, null, args);
log.info("Created task = {}", task);
return task;
@@ -123,6 +104,6 @@ public class JdbcAsyncService {
public Task update(Connection c, Task ref) throws SQLException {
TaskDao taskDao = new TaskDao(c);
- return taskDao.findById(ref.id);
+ return taskDao.findById(ref.id());
}
}
diff --git a/src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java b/src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java
deleted file mode 100644
index 327dffa..0000000
--- a/src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java
+++ /dev/null
@@ -1,100 +0,0 @@
-package io.trygvis.async.spring;
-
-import io.trygvis.async.AsyncService;
-import io.trygvis.async.JdbcAsyncService;
-import io.trygvis.async.SqlEffectExecutor;
-import io.trygvis.queue.Queue;
-import io.trygvis.queue.Task;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.jdbc.core.ConnectionCallback;
-import org.springframework.jdbc.core.JdbcTemplate;
-import org.springframework.transaction.annotation.Transactional;
-import org.springframework.transaction.support.TransactionSynchronization;
-import org.springframework.transaction.support.TransactionSynchronizationAdapter;
-import org.springframework.transaction.support.TransactionTemplate;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-
-import static org.springframework.transaction.annotation.Propagation.REQUIRED;
-import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization;
-
-public class SpringJdbcAsyncService implements AsyncService {
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory());
-
- private final TransactionTemplate transactionTemplate;
-
- private final JdbcTemplate jdbcTemplate;
-
- private SqlEffectExecutor sqlEffectExecutor;
-
- final JdbcAsyncService jdbcAsyncService;
-
- public SpringJdbcAsyncService(TransactionTemplate transactionTemplate, JdbcTemplate jdbcTemplate) {
- this.transactionTemplate = transactionTemplate;
- this.jdbcTemplate = jdbcTemplate;
- jdbcAsyncService = new JdbcAsyncService();
- sqlEffectExecutor = new SqlEffectExecutor(this.jdbcTemplate.getDataSource());
- }
-
- @Transactional(propagation = REQUIRED)
- public Queue registerQueue(final String name, final int interval, final AsyncService.AsyncCallable callable) {
- return jdbcTemplate.execute(new ConnectionCallback<Queue>() {
- @Override
- public Queue doInConnection(Connection c) throws SQLException {
-
- Queue q = jdbcAsyncService.registerQueue(c, sqlEffectExecutor, name, interval, callable);
-
- registerSynchronization(new TransactionSynchronizationAdapter() {
- public void afterCompletion(int status) {
- log.info("Transaction completed with status = {}", status);
- if (status == TransactionSynchronization.STATUS_COMMITTED) {
- jdbcAsyncService.startQueue(executor, name);
- }
- }
- });
-
- log.info("registerQueue: LEAVE");
- return q;
- }
- });
- }
-
- public Queue getQueue(String name) {
- return jdbcAsyncService.getQueue(name);
- }
-
- @Transactional(propagation = REQUIRED)
- public Task schedule(final Queue queue, final String... args) {
- return jdbcTemplate.execute(new ConnectionCallback<Task>() {
- @Override
- public Task doInConnection(Connection c) throws SQLException {
- return jdbcAsyncService.schedule(c, queue, args);
- }
- });
- }
-
- public Task schedule(final long parent, final Queue queue, final String... args) {
- return jdbcTemplate.execute(new ConnectionCallback<Task>() {
- @Override
- public Task doInConnection(Connection c) throws SQLException {
- return jdbcAsyncService.schedule(c, parent, queue, args);
- }
- });
- }
-
- @Transactional(readOnly = true)
- public Task update(final Task ref) {
- return jdbcTemplate.execute(new ConnectionCallback<Task>() {
- @Override
- public Task doInConnection(Connection c) throws SQLException {
- return jdbcAsyncService.update(c, ref);
- }
- });
- }
-}