aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2013-06-02 12:32:29 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2013-06-02 12:32:29 +0200
commit52084f7b4e6f50c90b3255cdf2eb9deab560c970 (patch)
treeeed9abd7fe9825aaacfd4fe24c8fd363cc41fed1 /src
parent7d704feb86c44fca57941d223e8605b55fcf68f0 (diff)
downloadquartz-based-queue-52084f7b4e6f50c90b3255cdf2eb9deab560c970.tar.gz
quartz-based-queue-52084f7b4e6f50c90b3255cdf2eb9deab560c970.tar.bz2
quartz-based-queue-52084f7b4e6f50c90b3255cdf2eb9deab560c970.tar.xz
quartz-based-queue-52084f7b4e6f50c90b3255cdf2eb9deab560c970.zip
o Making some test cases.
Diffstat (limited to 'src')
-rwxr-xr-xsrc/main/java/io/trygvis/async/AsyncService.java5
-rw-r--r--src/main/java/io/trygvis/async/JdbcAsyncService.java109
-rw-r--r--src/main/java/io/trygvis/async/QueueThread.java78
-rw-r--r--src/main/java/io/trygvis/async/SqlEffect.java12
-rw-r--r--src/main/java/io/trygvis/async/SqlEffectExecutor.java39
-rw-r--r--src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java102
-rw-r--r--src/main/java/io/trygvis/queue/QueueDao.java47
-rw-r--r--src/main/java/io/trygvis/queue/TaskDao.java119
-rw-r--r--src/main/java/io/trygvis/spring/DefaultConfig.java17
-rw-r--r--src/main/resources/create-postgresql.sql (renamed from src/main/resources/create.sql)6
-rwxr-xr-xsrc/test/java/io/trygvis/test/Article.java (renamed from src/main/java/io/trygvis/model/Article.java)2
-rwxr-xr-xsrc/test/java/io/trygvis/test/CreateArticleCallable.java (renamed from src/main/java/io/trygvis/CreateArticleCallable.java)3
-rwxr-xr-xsrc/test/java/io/trygvis/test/Main.java (renamed from src/main/java/io/trygvis/Main.java)18
-rwxr-xr-xsrc/test/java/io/trygvis/test/UpdateArticleCallable.java (renamed from src/main/java/io/trygvis/UpdateArticleCallable.java)2
-rw-r--r--src/test/java/io/trygvis/test/spring/PlainSpringTest.java59
-rwxr-xr-xsrc/test/java/io/trygvis/test/spring/TestConfig.java (renamed from src/main/java/io/trygvis/spring/Config.java)4
-rwxr-xr-xsrc/test/resources/applicationContext.xml (renamed from src/main/resources/applicationContext.xml)2
17 files changed, 445 insertions, 179 deletions
diff --git a/src/main/java/io/trygvis/async/AsyncService.java b/src/main/java/io/trygvis/async/AsyncService.java
index e90a0e4..57c1af8 100755
--- a/src/main/java/io/trygvis/async/AsyncService.java
+++ b/src/main/java/io/trygvis/async/AsyncService.java
@@ -2,8 +2,8 @@ package io.trygvis.async;
import io.trygvis.queue.Queue;
import io.trygvis.queue.Task;
-import org.quartz.SchedulerException;
+import java.sql.SQLException;
import java.util.List;
/**
@@ -16,9 +16,8 @@ public interface AsyncService {
* @param interval how often the queue should be polled for missed tasks in seconds.
* @param callable
* @return
- * @throws SchedulerException
*/
- Queue registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException;
+ Queue registerQueue(final String name, final int interval, AsyncCallable callable);
Queue getQueue(String name);
diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java
index 4e78a37..c34330e 100644
--- a/src/main/java/io/trygvis/async/JdbcAsyncService.java
+++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java
@@ -4,88 +4,66 @@ import io.trygvis.queue.Queue;
import io.trygvis.queue.QueueDao;
import io.trygvis.queue.Task;
import io.trygvis.queue.TaskDao;
-import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-import org.springframework.transaction.annotation.Transactional;
-import org.springframework.transaction.support.TransactionSynchronization;
-import org.springframework.transaction.support.TransactionSynchronizationAdapter;
-import org.springframework.transaction.support.TransactionTemplate;
+import java.sql.Connection;
+import java.sql.SQLException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.sleep;
import static java.util.Arrays.asList;
import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.springframework.transaction.annotation.Propagation.REQUIRED;
-import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization;
-@Component
-public class JdbcAsyncService implements AsyncService {
+public class JdbcAsyncService {
private final Logger log = LoggerFactory.getLogger(getClass());
- private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory());
-
private final Map<String, QueueThread> queues = new HashMap<>();
- @Autowired
- private TransactionTemplate transactionTemplate;
-
- @Autowired
- private QueueDao queueDao;
-
- @Autowired
- private TaskDao taskDao;
+ public Queue registerQueue(Connection c, SqlEffectExecutor sqlEffectExecutor, final String name, final int interval, AsyncService.AsyncCallable callable) throws SQLException {
+ QueueDao queueDao = new QueueDao(c);
- @Transactional(propagation = REQUIRED)
- public Queue registerQueue(final String name, final int interval, AsyncCallable callable) throws SchedulerException {
log.info("registerQueue: ENTER");
Queue q = queueDao.findByName(name);
log.info("q = {}", q);
- final long interval_;
if (q == null) {
q = new Queue(name, interval);
queueDao.insert(q);
- interval_ = interval;
- } else {
- // Found an existing queue. Use the Settings from the database.
- interval_ = q.interval;
}
- final QueueThread queueThread = new QueueThread(q, taskDao, transactionTemplate, callable);
+ final QueueThread queueThread = new QueueThread(sqlEffectExecutor, callable, q);
queues.put(name, queueThread);
- registerSynchronization(new TransactionSynchronizationAdapter() {
- public void afterCompletion(int status) {
- log.info("Transaction completed with status = {}", status);
- if (status == TransactionSynchronization.STATUS_COMMITTED) {
- log.info("Starting thread for queue {} with poll interval = {}s", name, interval);
- executor.scheduleAtFixedRate(new Runnable() {
- public void run() {
- queueThread.ping();
- }
- }, 10, interval_, SECONDS);
- Thread thread = new Thread(queueThread, name);
- thread.setDaemon(true);
- thread.start();
- }
- }
- });
-
log.info("registerQueue: LEAVE");
return q;
}
+ public void startQueue(ScheduledThreadPoolExecutor executor, String name) {
+ final QueueThread queueThread = queues.get(name);
+
+ if (queueThread == null) {
+ throw new RuntimeException("No such queue: " + name);
+ }
+
+ long interval = queueThread.queue.interval;
+ log.info("Starting thread for queue {} with poll interval = {}s", name, interval);
+ executor.scheduleAtFixedRate(new Runnable() {
+ public void run() {
+ queueThread.ping();
+ }
+ }, 10, interval, SECONDS);
+ Thread thread = new Thread(queueThread, name);
+ thread.setDaemon(true);
+ thread.start();
+ }
+
public Queue getQueue(String name) {
QueueThread queueThread = queues.get(name);
@@ -96,16 +74,17 @@ public class JdbcAsyncService implements AsyncService {
return queueThread.queue;
}
- @Transactional(propagation = REQUIRED)
- public Task schedule(final Queue queue, String... args) {
- return scheduleInner(null, queue, args);
+ public Task schedule(Connection c, final Queue queue, String... args) throws SQLException {
+ return scheduleInner(c, null, queue, args);
}
- public Task schedule(long parent, Queue queue, String... args) {
- return scheduleInner(parent, queue, args);
+ public Task schedule(Connection c, long parent, Queue queue, String... args) throws SQLException {
+ return scheduleInner(c, parent, queue, args);
}
- private Task scheduleInner(Long parent, final Queue queue, String... args) {
+ private Task scheduleInner(Connection c, Long parent, final Queue queue, String... args) throws SQLException {
+ TaskDao taskDao = new TaskDao(c);
+
Date scheduled = new Date();
StringBuilder arguments = new StringBuilder();
@@ -114,27 +93,22 @@ public class JdbcAsyncService implements AsyncService {
}
long id = taskDao.insert(parent, queue.name, scheduled, arguments.toString());
- Task task = new Task(parent, id, queue.name, scheduled, null, 0, null, asList(args));
+ Task task = new Task(id, parent, queue.name, scheduled, null, 0, null, asList(args));
log.info("Created task = {}", task);
- registerSynchronization(new TransactionSynchronizationAdapter() {
- public void afterCompletion(int status) {
- if (status == TransactionSynchronization.STATUS_COMMITTED) {
- queues.get(queue.name).ping();
- }
- }
- });
-
return task;
}
- @Transactional
- public Task await(Task task, long timeout) {
+ public Task await(Connection c, Task task, long timeout) throws SQLException {
final long start = currentTimeMillis();
final long end = start + timeout;
while (currentTimeMillis() < end) {
- task = update(task);
+ task = update(c, task);
+
+ if (task == null) {
+ throw new RuntimeException("The task went away.");
+ }
try {
sleep(100);
@@ -146,8 +120,9 @@ public class JdbcAsyncService implements AsyncService {
return task;
}
- @Transactional(readOnly = true)
- public Task update(Task ref) {
+ public Task update(Connection c, Task ref) throws SQLException {
+ TaskDao taskDao = new TaskDao(c);
+
return taskDao.findById(ref.id);
}
}
diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java
index 69466df..00c46b4 100644
--- a/src/main/java/io/trygvis/async/QueueThread.java
+++ b/src/main/java/io/trygvis/async/QueueThread.java
@@ -5,37 +5,33 @@ import io.trygvis.queue.Task;
import io.trygvis.queue.TaskDao;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.transaction.TransactionException;
-import org.springframework.transaction.TransactionStatus;
-import org.springframework.transaction.support.TransactionCallback;
-import org.springframework.transaction.support.TransactionCallbackWithoutResult;
-import org.springframework.transaction.support.TransactionTemplate;
+import java.sql.Connection;
+import java.sql.SQLException;
import java.util.Date;
import java.util.List;
+import static io.trygvis.async.SqlEffectExecutor.SqlExecutionException;
+
class QueueThread implements Runnable {
private final Logger log = LoggerFactory.getLogger(getClass());
- public boolean shouldRun = true;
-
- private boolean checkForNewTasks;
+ private final SqlEffectExecutor sqlEffectExecutor;
- private boolean busy;
+ private final AsyncService.AsyncCallable callable;
public final Queue queue;
- private final TaskDao taskDao;
+ public boolean shouldRun = true;
- private final TransactionTemplate transactionTemplate;
+ private boolean checkForNewTasks;
- private final AsyncService.AsyncCallable callable;
+ private boolean busy;
- QueueThread(Queue queue, TaskDao taskDao, TransactionTemplate transactionTemplate, AsyncService.AsyncCallable callable) {
- this.queue = queue;
- this.taskDao = taskDao;
- this.transactionTemplate = transactionTemplate;
+ QueueThread(SqlEffectExecutor sqlEffectExecutor, AsyncService.AsyncCallable callable, Queue queue) {
+ this.sqlEffectExecutor = sqlEffectExecutor;
this.callable = callable;
+ this.queue = queue;
}
public void ping() {
@@ -52,19 +48,25 @@ class QueueThread implements Runnable {
public void run() {
while (shouldRun) {
try {
- List<Task> tasks = transactionTemplate.execute(new TransactionCallback<List<Task>>() {
- public List<Task> doInTransaction(TransactionStatus status) {
- return taskDao.findByNameAndCompletedIsNull(queue.name);
+// List<Task> tasks = transactionTemplate.execute(new TransactionCallback<List<Task>>() {
+// public List<Task> doInTransaction(TransactionStatus status) {
+// return taskDao.findByNameAndCompletedIsNull(queue.name);
+// }
+// });
+ List<Task> tasks = sqlEffectExecutor.execute(new SqlEffect<List<Task>>() {
+ @Override
+ public List<Task> doInConnection(Connection connection) throws SQLException {
+ return new TaskDao(connection).findByNameAndCompletedIsNull(queue.name);
}
});
log.info("Found {} tasks on queue {}", tasks.size(), queue.name);
- if(tasks.size() > 0) {
+ if (tasks.size() > 0) {
for (final Task task : tasks) {
try {
executeTask(task);
- } catch (TransactionException | TaskFailureException e) {
+ } catch (SqlExecutionException | TaskFailureException e) {
log.warn("Task execution failed", e);
}
}
@@ -96,24 +98,44 @@ class QueueThread implements Runnable {
private void executeTask(final Task task) {
final Date run = new Date();
log.info("Setting last run on task. date = {}, task = {}", run, task);
- transactionTemplate.execute(new TransactionCallbackWithoutResult() {
- protected void doInTransactionWithoutResult(TransactionStatus status) {
- taskDao.update(task.registerRun());
+ sqlEffectExecutor.execute(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection connection) throws SQLException {
+ new TaskDao(connection).update(task.registerRun());
}
});
-
- transactionTemplate.execute(new TransactionCallbackWithoutResult() {
- protected void doInTransactionWithoutResult(TransactionStatus status) {
+// transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+// protected void doInTransactionWithoutResult(TransactionStatus status) {
+// taskDao.update(task.registerRun());
+// }
+// });
+
+ sqlEffectExecutor.execute(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
try {
callable.run(task.arguments);
Date completed = new Date();
Task t = task.registerComplete(completed);
log.info("Completed task: {}", t);
- taskDao.update(t);
+ new TaskDao(c).update(t);
} catch (Exception e) {
throw new TaskFailureException(e);
}
}
});
+// transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+// protected void doInTransactionWithoutResult(TransactionStatus status) {
+// try {
+// callable.run(task.arguments);
+// Date completed = new Date();
+// Task t = task.registerComplete(completed);
+// log.info("Completed task: {}", t);
+// taskDao.update(t);
+// } catch (Exception e) {
+// throw new TaskFailureException(e);
+// }
+// }
+// });
}
}
diff --git a/src/main/java/io/trygvis/async/SqlEffect.java b/src/main/java/io/trygvis/async/SqlEffect.java
new file mode 100644
index 0000000..d0c4e9b
--- /dev/null
+++ b/src/main/java/io/trygvis/async/SqlEffect.java
@@ -0,0 +1,12 @@
+package io.trygvis.async;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+public interface SqlEffect<A> {
+ A doInConnection(Connection c) throws SQLException;
+
+ interface Void {
+ void doInConnection(Connection c) throws SQLException;
+ }
+}
diff --git a/src/main/java/io/trygvis/async/SqlEffectExecutor.java b/src/main/java/io/trygvis/async/SqlEffectExecutor.java
new file mode 100644
index 0000000..c8abbd3
--- /dev/null
+++ b/src/main/java/io/trygvis/async/SqlEffectExecutor.java
@@ -0,0 +1,39 @@
+package io.trygvis.async;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+public class SqlEffectExecutor {
+
+ private final DataSource dataSource;
+
+ public SqlEffectExecutor(DataSource dataSource) {
+ this.dataSource = dataSource;
+ }
+
+ public <A> A execute(SqlEffect<A> effect) {
+ try (Connection c = dataSource.getConnection()) {
+ return effect.doInConnection(c);
+ } catch (SQLException e) {
+ throw new SqlExecutionException(e);
+ }
+ }
+
+ public void execute(SqlEffect.Void effect) {
+ try (Connection c = dataSource.getConnection()) {
+ effect.doInConnection(c);
+ } catch (SQLException e) {
+ throw new SqlExecutionException(e);
+ }
+ }
+
+ public static class SqlExecutionException extends RuntimeException {
+ public final SQLException exception;
+
+ public SqlExecutionException(SQLException ex) {
+ super(ex);
+ this.exception = ex;
+ }
+ }
+}
diff --git a/src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java b/src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java
new file mode 100644
index 0000000..8517c68
--- /dev/null
+++ b/src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java
@@ -0,0 +1,102 @@
+package io.trygvis.async.spring;
+
+import io.trygvis.async.AsyncService;
+import io.trygvis.async.JdbcAsyncService;
+import io.trygvis.async.SqlEffectExecutor;
+import io.trygvis.queue.Queue;
+import io.trygvis.queue.Task;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.jdbc.core.ConnectionCallback;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.transaction.annotation.Transactional;
+import org.springframework.transaction.support.TransactionSynchronization;
+import org.springframework.transaction.support.TransactionSynchronizationAdapter;
+import org.springframework.transaction.support.TransactionTemplate;
+
+import javax.annotation.PostConstruct;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import static org.springframework.transaction.annotation.Propagation.REQUIRED;
+import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization;
+
+public class SpringJdbcAsyncService implements AsyncService {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory());
+
+ private final TransactionTemplate transactionTemplate;
+
+ private final JdbcTemplate jdbcTemplate;
+
+ private SqlEffectExecutor sqlEffectExecutor;
+
+ final JdbcAsyncService jdbcAsyncService;
+
+ public SpringJdbcAsyncService(TransactionTemplate transactionTemplate, JdbcTemplate jdbcTemplate) {
+ this.transactionTemplate = transactionTemplate;
+ this.jdbcTemplate = jdbcTemplate;
+ jdbcAsyncService = new JdbcAsyncService();
+ sqlEffectExecutor = new SqlEffectExecutor(this.jdbcTemplate.getDataSource());
+ }
+
+ @Transactional(propagation = REQUIRED)
+ public Queue registerQueue(final String name, final int interval, final AsyncService.AsyncCallable callable) {
+ return jdbcTemplate.execute(new ConnectionCallback<Queue>() {
+ @Override
+ public Queue doInConnection(Connection c) throws SQLException {
+
+ Queue q = jdbcAsyncService.registerQueue(c, sqlEffectExecutor, name, interval, callable);
+
+ registerSynchronization(new TransactionSynchronizationAdapter() {
+ public void afterCompletion(int status) {
+ log.info("Transaction completed with status = {}", status);
+ if (status == TransactionSynchronization.STATUS_COMMITTED) {
+ jdbcAsyncService.startQueue(executor, name);
+ }
+ }
+ });
+
+ log.info("registerQueue: LEAVE");
+ return q;
+ }
+ });
+ }
+
+ public Queue getQueue(String name) {
+ return jdbcAsyncService.getQueue(name);
+ }
+
+ @Transactional(propagation = REQUIRED)
+ public Task schedule(final Queue queue, final String... args) {
+ return jdbcTemplate.execute(new ConnectionCallback<Task>() {
+ @Override
+ public Task doInConnection(Connection c) throws SQLException {
+ return jdbcAsyncService.schedule(c, queue, args);
+ }
+ });
+ }
+
+ public Task schedule(final long parent, final Queue queue, final String... args) {
+ return jdbcTemplate.execute(new ConnectionCallback<Task>() {
+ @Override
+ public Task doInConnection(Connection c) throws SQLException {
+ return jdbcAsyncService.schedule(c, parent, queue, args);
+ }
+ });
+ }
+
+ @Transactional(readOnly = true)
+ public Task update(final Task ref) {
+ return jdbcTemplate.execute(new ConnectionCallback<Task>() {
+ @Override
+ public Task doInConnection(Connection c) throws SQLException {
+ return jdbcAsyncService.update(c, ref);
+ }
+ });
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/QueueDao.java b/src/main/java/io/trygvis/queue/QueueDao.java
index 63dde2a..2f69e11 100644
--- a/src/main/java/io/trygvis/queue/QueueDao.java
+++ b/src/main/java/io/trygvis/queue/QueueDao.java
@@ -1,36 +1,45 @@
package io.trygvis.queue;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.jdbc.core.JdbcTemplate;
-import org.springframework.jdbc.core.RowMapper;
-import org.springframework.stereotype.Component;
-
+import java.sql.Connection;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
-import static org.springframework.dao.support.DataAccessUtils.singleResult;
-
-@Component
public class QueueDao {
- @Autowired
- private JdbcTemplate jdbcTemplate;
+ private final Connection connection;
- public Queue findByName(String name) {
- return singleResult(jdbcTemplate.query("SELECT name, interval FROM queue WHERE name=?", new QueueRowMapper(), name));
+ public QueueDao(Connection connection) {
+ this.connection = connection;
}
- public void insert(Queue q) {
- jdbcTemplate.update("INSERT INTO queue(name, interval) VALUES(?, ?)", q.name, q.interval);
+ public Queue findByName(String name) throws SQLException {
+ try (PreparedStatement stmt = connection.prepareStatement("SELECT name, interval FROM queue WHERE name=?")) {
+ stmt.setString(1, name);
+ ResultSet rs = stmt.executeQuery();
+ return rs.next() ? mapRow(rs) : null;
+ }
}
- public void update(Queue q) {
- jdbcTemplate.update("UPDATE queue SET interval=? WHERE name=?", q.interval, q.name);
+ public void insert(Queue q) throws SQLException {
+ try (PreparedStatement stmt = connection.prepareStatement("INSERT INTO queue(name, interval) VALUES(?, ?)")) {
+ int i = 1;
+ stmt.setString(i++, q.name);
+ stmt.setLong(i, q.interval);
+ stmt.executeUpdate();
+ }
}
- private class QueueRowMapper implements RowMapper<Queue> {
- public Queue mapRow(ResultSet rs, int rowNum) throws SQLException {
- return new Queue(rs.getString(1), rs.getLong(2));
+ public void update(Queue q) throws SQLException {
+ try (PreparedStatement stmt = connection.prepareStatement("UPDATE queue SET interval=? WHERE name=?")) {
+ int i = 1;
+ stmt.setLong(i++, q.interval);
+ stmt.setString(i, q.name);
+ stmt.executeUpdate();
}
}
+
+ public Queue mapRow(ResultSet rs) throws SQLException {
+ return new Queue(rs.getString(1), rs.getLong(2));
+ }
}
diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java
index a59dcbb..5459933 100644
--- a/src/main/java/io/trygvis/queue/TaskDao.java
+++ b/src/main/java/io/trygvis/queue/TaskDao.java
@@ -1,69 +1,104 @@
package io.trygvis.queue;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.jdbc.core.JdbcTemplate;
-import org.springframework.jdbc.core.RowMapper;
-import org.springframework.stereotype.Component;
-import org.springframework.transaction.annotation.Transactional;
-
+import java.sql.Connection;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import static java.util.Arrays.asList;
-import static org.springframework.transaction.annotation.Propagation.MANDATORY;
-@Component
public class TaskDao {
- @Autowired
- private JdbcTemplate jdbcTemplate;
+ private final Connection connection;
+
+ public static final String fields = "id, parent, queue, scheduled, last_run, run_count, completed, arguments";
- public long insert(String queue, Date scheduled, String arguments) {
- return this.insert(null, queue, scheduled, arguments);
+ public TaskDao(Connection connection) {
+ this.connection = connection;
}
- @Transactional(propagation = MANDATORY)
- public long insert(Long parent, String queue, Date scheduled, String arguments) {
- jdbcTemplate.update("INSERT INTO task(id, parent, run_count, queue, scheduled, arguments) " +
- "VALUES(nextval('task_seq'), ?, 0, ?, ?, ?)", parent, queue, scheduled, arguments);
- return jdbcTemplate.queryForObject("SELECT currval('task_seq')", Long.class);
+ public long insert(String queue, Date scheduled, String arguments) throws SQLException {
+ return insert(null, queue, scheduled, arguments);
}
- @Transactional(propagation = MANDATORY)
- public Task findById(long id) {
- return jdbcTemplate.queryForObject("SELECT " + TaskRowMapper.fields + " FROM task WHERE id=?",
- new TaskRowMapper(), id);
+ public long insert(Long parent, String queue, Date scheduled, String arguments) throws SQLException {
+ String sql = "INSERT INTO task(id, parent, run_count, queue, scheduled, arguments) " +
+ "VALUES(nextval('task_seq'), ?, 0, ?, ?, ?)";
+ try (PreparedStatement stmt = connection.prepareStatement(sql)) {
+ int i = 1;
+ if (parent == null) {
+ stmt.setNull(i++, Types.BIGINT);
+ } else {
+ stmt.setLong(i++, parent);
+ }
+ stmt.setString(i++, queue);
+ stmt.setTimestamp(i++, new Timestamp(scheduled.getTime()));
+ stmt.setString(i, arguments);
+ stmt.executeUpdate();
+ }
+ try (PreparedStatement stmt = connection.prepareStatement("SELECT currval('task_seq')")) {
+ ResultSet rs = stmt.executeQuery();
+ rs.next();
+ return rs.getLong(1);
+ }
}
- @Transactional(propagation = MANDATORY)
- public List<Task> findByNameAndCompletedIsNull(String name) {
- return jdbcTemplate.query("SELECT " + TaskRowMapper.fields + " FROM task WHERE queue=? AND completed IS NULL",
- new TaskRowMapper(), name);
+ public Task findById(long id) throws SQLException {
+ try (PreparedStatement stmt = connection.prepareStatement("SELECT " + fields + " FROM task WHERE id=?")) {
+ ResultSet rs = stmt.executeQuery();
+ return rs.next() ? mapRow(rs) : null;
+ }
}
- @Transactional(propagation = MANDATORY)
- public void update(Task task) {
- jdbcTemplate.update("UPDATE task SET scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?",
- task.scheduled, task.lastRun, task.runCount, task.completed, task.id);
+ public List<Task> findByNameAndCompletedIsNull(String name) throws SQLException {
+ try (PreparedStatement stmt = connection.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND completed IS NULL")) {
+ int i = 1;
+ stmt.setString(i, name);
+ ResultSet rs = stmt.executeQuery();
+ List<Task> list = new ArrayList<>();
+ while (rs.next()) {
+ list.add(mapRow(rs));
+ }
+ return list;
+ }
}
- private class TaskRowMapper implements RowMapper<Task> {
- public static final String fields = "id, parent, queue, scheduled, last_run, run_count, completed, arguments";
+ public void update(Task task) throws SQLException {
+ try (PreparedStatement stmt = connection.prepareStatement("UPDATE task SET scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?")) {
+ int i = 1;
+ stmt.setTimestamp(i++, new Timestamp(task.scheduled.getTime()));
+ setTimestamp(stmt, i++, task.lastRun);
+ stmt.setInt(i++, task.runCount);
+ setTimestamp(stmt, i++, task.completed);
+ stmt.setLong(i, task.id);
+ stmt.executeUpdate();
+ }
+ }
- public Task mapRow(ResultSet rs, int rowNum) throws SQLException {
- String arguments = rs.getString(8);
- return new Task(
- rs.getLong(1),
- rs.getLong(2),
- rs.getString(3),
- rs.getTimestamp(4),
- rs.getTimestamp(5),
- rs.getInt(6),
- rs.getTimestamp(7),
- arguments != null ? asList(arguments.split(" ")) : Collections.<String>emptyList());
+ private static void setTimestamp(PreparedStatement stmt, int parameterIndex, Date date) throws SQLException {
+ if (date == null) {
+ stmt.setNull(parameterIndex, Types.TIMESTAMP);
+ } else {
+ stmt.setTimestamp(parameterIndex, new Timestamp(date.getTime()));
}
}
+
+ public Task mapRow(ResultSet rs) throws SQLException {
+ String arguments = rs.getString(8);
+ return new Task(
+ rs.getLong(1),
+ rs.getLong(2),
+ rs.getString(3),
+ rs.getTimestamp(4),
+ rs.getTimestamp(5),
+ rs.getInt(6),
+ rs.getTimestamp(7),
+ arguments != null ? asList(arguments.split(" ")) : Collections.<String>emptyList());
+ }
}
diff --git a/src/main/java/io/trygvis/spring/DefaultConfig.java b/src/main/java/io/trygvis/spring/DefaultConfig.java
new file mode 100644
index 0000000..af8f644
--- /dev/null
+++ b/src/main/java/io/trygvis/spring/DefaultConfig.java
@@ -0,0 +1,17 @@
+package io.trygvis.spring;
+
+import io.trygvis.async.AsyncService;
+import io.trygvis.async.spring.SpringJdbcAsyncService;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.transaction.support.TransactionTemplate;
+
+@Configuration
+public class DefaultConfig {
+
+ @Bean
+ public AsyncService asyncService(TransactionTemplate transactionTemplate, JdbcTemplate jdbcTemplate) {
+ return new SpringJdbcAsyncService(transactionTemplate, jdbcTemplate);
+ }
+}
diff --git a/src/main/resources/create.sql b/src/main/resources/create-postgresql.sql
index f7f2939..39672f0 100644
--- a/src/main/resources/create.sql
+++ b/src/main/resources/create-postgresql.sql
@@ -2,7 +2,7 @@ BEGIN;
DROP TABLE IF EXISTS task;
DROP TABLE IF EXISTS queue;
-DROP SEQUENCE IF EXISTS task_id;
+DROP SEQUENCE IF EXISTS task_seq;
CREATE TABLE queue (
name VARCHAR(100) NOT NULL,
@@ -12,7 +12,7 @@ CREATE TABLE queue (
CREATE TABLE task (
id BIGINT NOT NULL,
- parent BIGINT NOT NULL,
+ parent BIGINT,
queue VARCHAR(100) NOT NULL,
scheduled TIMESTAMP NOT NULL,
last_run TIMESTAMP,
@@ -24,6 +24,6 @@ CREATE TABLE task (
CONSTRAINT fk_task__parent FOREIGN KEY (parent) REFERENCES task (id)
);
-CREATE SEQUENCE task_id;
+CREATE SEQUENCE task_seq;
COMMIT;
diff --git a/src/main/java/io/trygvis/model/Article.java b/src/test/java/io/trygvis/test/Article.java
index e86c570..d4f54ce 100755
--- a/src/main/java/io/trygvis/model/Article.java
+++ b/src/test/java/io/trygvis/test/Article.java
@@ -1,4 +1,4 @@
-package io.trygvis.model;
+package io.trygvis.test;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
diff --git a/src/main/java/io/trygvis/CreateArticleCallable.java b/src/test/java/io/trygvis/test/CreateArticleCallable.java
index 471b59d..f68cd5b 100755
--- a/src/main/java/io/trygvis/CreateArticleCallable.java
+++ b/src/test/java/io/trygvis/test/CreateArticleCallable.java
@@ -1,6 +1,5 @@
-package io.trygvis;
+package io.trygvis.test;
-import io.trygvis.model.Article;
import io.trygvis.async.AsyncService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/src/main/java/io/trygvis/Main.java b/src/test/java/io/trygvis/test/Main.java
index 08b9b75..721df61 100755
--- a/src/main/java/io/trygvis/Main.java
+++ b/src/test/java/io/trygvis/test/Main.java
@@ -1,4 +1,4 @@
-package io.trygvis;
+package io.trygvis.test;
import io.trygvis.async.AsyncService;
import io.trygvis.queue.Queue;
@@ -11,8 +11,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.stereotype.Component;
-import org.springframework.transaction.TransactionStatus;
-import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import java.util.ArrayList;
@@ -83,13 +81,13 @@ public class Main {
final int count = 1;
log.info("Creating {} tasks", count);
- transactionTemplate.execute(new TransactionCallbackWithoutResult() {
- protected void doInTransactionWithoutResult(TransactionStatus status) {
- for (int i = 0; i < count; i++) {
- tasks.add(asyncService.schedule(q));
- }
- }
- });
+// transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+// protected void doInTransactionWithoutResult(TransactionStatus status) {
+// for (int i = 0; i < count; i++) {
+// tasks.add(asyncService.schedule(q));
+// }
+// }
+// });
log.info("Created {} tasks", count);
while (true) {
diff --git a/src/main/java/io/trygvis/UpdateArticleCallable.java b/src/test/java/io/trygvis/test/UpdateArticleCallable.java
index a910855..aae28b9 100755
--- a/src/main/java/io/trygvis/UpdateArticleCallable.java
+++ b/src/test/java/io/trygvis/test/UpdateArticleCallable.java
@@ -1,4 +1,4 @@
-package io.trygvis;
+package io.trygvis.test;
import io.trygvis.async.AsyncService;
import org.slf4j.Logger;
diff --git a/src/test/java/io/trygvis/test/spring/PlainSpringTest.java b/src/test/java/io/trygvis/test/spring/PlainSpringTest.java
new file mode 100644
index 0000000..9a7a436
--- /dev/null
+++ b/src/test/java/io/trygvis/test/spring/PlainSpringTest.java
@@ -0,0 +1,59 @@
+package io.trygvis.test.spring;
+
+import io.trygvis.async.AsyncService;
+import io.trygvis.queue.Queue;
+import io.trygvis.spring.DefaultConfig;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.lang.System.getProperty;
+import static java.lang.System.setProperty;
+import static org.fest.assertions.Assertions.assertThat;
+import static org.junit.Assert.assertNotNull;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(classes = {TestConfig.class, DefaultConfig.class})
+public class PlainSpringTest {
+
+ @Autowired
+ private AsyncService asyncService;
+
+ static {
+ String username = getProperty("user.name");
+ setProperty("database.url", getProperty("jdbc.url", "jdbc:postgresql://localhost/" + username));
+ setProperty("database.username", username);
+ setProperty("database.password", username);
+ }
+
+ @Test
+ public void testBasic() throws SQLException, InterruptedException {
+ final AtomicReference<List<String>> ref = new AtomicReference<>();
+ Queue test = asyncService.registerQueue("test", 10, new AsyncService.AsyncCallable() {
+ public void run(List<String> arguments) throws Exception {
+ System.out.println("PlainSpringTest.run");
+ ref.set(arguments);
+ synchronized (ref) {
+ ref.notify();
+ }
+ }
+ });
+
+ synchronized (ref) {
+ System.out.println("Scheduling task");
+ asyncService.schedule(test, "hello", "world");
+ System.out.println("Waiting");
+ ref.wait(1000);
+ }
+
+ List<String> args = ref.get();
+ assertNotNull(args);
+ assertThat(args).containsExactly("hello", "world");
+ }
+}
diff --git a/src/main/java/io/trygvis/spring/Config.java b/src/test/java/io/trygvis/test/spring/TestConfig.java
index df4b2e2..7853cb5 100755
--- a/src/main/java/io/trygvis/spring/Config.java
+++ b/src/test/java/io/trygvis/test/spring/TestConfig.java
@@ -1,4 +1,4 @@
-package io.trygvis.spring;
+package io.trygvis.test.spring;
import com.jolbox.bonecp.BoneCPDataSource;
import org.springframework.beans.factory.annotation.Value;
@@ -20,7 +20,7 @@ import javax.sql.DataSource;
@ComponentScan(basePackages = "io.trygvis")
@EnableTransactionManagement
//@EnableJpaRepositories(basePackages = "io.trygvis.data")
-public class Config {
+public class TestConfig {
@Bean
public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() throws Exception {
diff --git a/src/main/resources/applicationContext.xml b/src/test/resources/applicationContext.xml
index 2d33d8c..5f173b3 100755
--- a/src/main/resources/applicationContext.xml
+++ b/src/test/resources/applicationContext.xml
@@ -17,6 +17,6 @@
http://www.springframework.org/schema/data/jpa http://www.springframework.org/schema/data/jpa/spring-jpa.xsd">
<context:annotation-config/>
- <bean class="io.trygvis.spring.Config"/>
+ <bean class="io.trygvis.test.spring.TestConfig"/>
</beans>