aboutsummaryrefslogtreecommitdiff
path: root/src/main/java
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2013-06-02 12:32:29 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2013-06-02 12:32:29 +0200
commit52084f7b4e6f50c90b3255cdf2eb9deab560c970 (patch)
treeeed9abd7fe9825aaacfd4fe24c8fd363cc41fed1 /src/main/java
parent7d704feb86c44fca57941d223e8605b55fcf68f0 (diff)
downloadquartz-based-queue-52084f7b4e6f50c90b3255cdf2eb9deab560c970.tar.gz
quartz-based-queue-52084f7b4e6f50c90b3255cdf2eb9deab560c970.tar.bz2
quartz-based-queue-52084f7b4e6f50c90b3255cdf2eb9deab560c970.tar.xz
quartz-based-queue-52084f7b4e6f50c90b3255cdf2eb9deab560c970.zip
o Making some test cases.
Diffstat (limited to 'src/main/java')
-rwxr-xr-xsrc/main/java/io/trygvis/CreateArticleCallable.java42
-rwxr-xr-xsrc/main/java/io/trygvis/Main.java117
-rwxr-xr-xsrc/main/java/io/trygvis/UpdateArticleCallable.java44
-rwxr-xr-xsrc/main/java/io/trygvis/async/AsyncService.java5
-rw-r--r--src/main/java/io/trygvis/async/JdbcAsyncService.java109
-rw-r--r--src/main/java/io/trygvis/async/QueueThread.java78
-rw-r--r--src/main/java/io/trygvis/async/SqlEffect.java12
-rw-r--r--src/main/java/io/trygvis/async/SqlEffectExecutor.java39
-rw-r--r--src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java102
-rwxr-xr-xsrc/main/java/io/trygvis/model/Article.java55
-rw-r--r--src/main/java/io/trygvis/queue/QueueDao.java47
-rw-r--r--src/main/java/io/trygvis/queue/TaskDao.java119
-rwxr-xr-xsrc/main/java/io/trygvis/spring/Config.java172
-rw-r--r--src/main/java/io/trygvis/spring/DefaultConfig.java17
14 files changed, 369 insertions, 589 deletions
diff --git a/src/main/java/io/trygvis/CreateArticleCallable.java b/src/main/java/io/trygvis/CreateArticleCallable.java
deleted file mode 100755
index 471b59d..0000000
--- a/src/main/java/io/trygvis/CreateArticleCallable.java
+++ /dev/null
@@ -1,42 +0,0 @@
-package io.trygvis;
-
-import io.trygvis.model.Article;
-import io.trygvis.async.AsyncService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
-import org.springframework.transaction.annotation.Transactional;
-
-import java.util.Date;
-import java.util.List;
-import java.util.Random;
-
-import static org.springframework.transaction.annotation.Propagation.MANDATORY;
-
-@Component("createArticle")
-@Transactional(propagation = MANDATORY)
-public class CreateArticleCallable implements AsyncService.AsyncCallable {
- private final Logger log = LoggerFactory.getLogger(getClass());
-
-// @PersistenceContext
-// private EntityManager entityManager;
-
- private Random random = new Random();
-
- public void run(List<String> arguments) throws Exception {
- log.info("CreateArticeJob.run: BEGIN");
-
- if (random.nextInt() % 3 == 0) {
- throw new RuntimeException("failing create article");
- }
-
- Date now = new Date();
-
- log.info("now = {}", now);
-
- Article article = new Article(new Date(), null, "title", "body");
-// entityManager.persist(article);
-
- log.info("CreateArticeJob.run: END");
- }
-}
diff --git a/src/main/java/io/trygvis/Main.java b/src/main/java/io/trygvis/Main.java
deleted file mode 100755
index 08b9b75..0000000
--- a/src/main/java/io/trygvis/Main.java
+++ /dev/null
@@ -1,117 +0,0 @@
-package io.trygvis;
-
-import io.trygvis.async.AsyncService;
-import io.trygvis.queue.Queue;
-import io.trygvis.queue.Task;
-import org.hibernate.dialect.PostgreSQL82Dialect;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.bridge.SLF4JBridgeHandler;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.context.support.ClassPathXmlApplicationContext;
-import org.springframework.stereotype.Component;
-import org.springframework.transaction.TransactionStatus;
-import org.springframework.transaction.support.TransactionCallbackWithoutResult;
-import org.springframework.transaction.support.TransactionTemplate;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import static java.lang.System.*;
-import static java.lang.Thread.sleep;
-
-@Component
-public class Main {
- private static final Logger log = LoggerFactory.getLogger(Main.class);
-
- public static void main(String[] args) throws Exception {
- SLF4JBridgeHandler.install();
-
- String username = getProperty("user.name");
- setProperty("database.url", getProperty("jdbc.url", "jdbc:postgresql://localhost/" + username));
- setProperty("database.username", username);
- setProperty("database.password", username);
-// setProperty("hibernate.showSql", "true");
- setProperty("hibernate.hbm2ddl.auto", "create"); // create
- setProperty("hibernate.dialect", PostgreSQL82Dialect.class.getName());
-
- log.info("Starting context");
- ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
- log.info("Started context");
-
- try {
- context.getBean(Main.class).run();
-// log.info("Sleeping");
-// sleep(1000 * 1000);
- } catch (Exception e) {
- e.printStackTrace(System.out);
- }
-
- log.info("Stopping context");
- context.stop();
- log.info("Stopped context");
-
- exit(0);
- }
-
- @Autowired
- private TransactionTemplate transactionTemplate;
-
- @Autowired
- private AsyncService asyncService;
-
- @Autowired
- @Qualifier("createArticle")
- private AsyncService.AsyncCallable createArticleCallable;
-
- @Autowired
- @Qualifier("updateArticle")
- private AsyncService.AsyncCallable updateArticleCallable;
-
- public void run() throws Exception {
- log.info("Main.run");
-
- final Queue q = asyncService.registerQueue("create-article", 1, createArticleCallable);
-// log.info("queue registered: ref = {}", q);
-// asyncService.registerQueue("update-queue", 1, updateArticleCallable);
-
-// q = asyncService.getQueue("create-queue");
-
- final List<Task> tasks = new ArrayList<>();
-
- final int count = 1;
- log.info("Creating {} tasks", count);
- transactionTemplate.execute(new TransactionCallbackWithoutResult() {
- protected void doInTransactionWithoutResult(TransactionStatus status) {
- for (int i = 0; i < count; i++) {
- tasks.add(asyncService.schedule(q));
- }
- }
- });
- log.info("Created {} tasks", count);
-
- while (true) {
- sleep(10000);
-
- log.info("Checking for status of {} tasks", tasks.size());
- for (Iterator<Task> iterator = tasks.iterator(); iterator.hasNext(); ) {
- Task task = iterator.next();
-
- task = asyncService.update(task);
-
-// log.info("task = {}", task);
-
- if (task.isDone()) {
- iterator.remove();
- }
- }
-
- if (tasks.isEmpty()) {
- log.info("No more tasks");
- break;
- }
- }
- }
-}
diff --git a/src/main/java/io/trygvis/UpdateArticleCallable.java b/src/main/java/io/trygvis/UpdateArticleCallable.java
deleted file mode 100755
index a910855..0000000
--- a/src/main/java/io/trygvis/UpdateArticleCallable.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package io.trygvis;
-
-import io.trygvis.async.AsyncService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
-
-import java.util.Date;
-import java.util.List;
-import java.util.Random;
-
-@Component("updateArticle")
-public class UpdateArticleCallable
- implements AsyncService.AsyncCallable {
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private final Random r = new Random();
-
-// @PersistenceContext
-// private EntityManager entityManager;
-
-// @Transactional(propagation = REQUIRES_NEW)
- public void run(List<String> arguments) throws Exception {
- log.info("UpdateArticeJob.run: BEGIN");
-
- Date now = new Date();
-
- log.info("now = {}", now);
-
-/*
- TypedQuery<Article> q = entityManager.createQuery(entityManager.getCriteriaBuilder().createQuery(Article.class));
-
- List<Article> list = q.getResultList();
- log.info("Got {} articles", list.size());
-
- Article a = list.get(r.nextInt(list.size()));
- a.setUpdated(new Date());
-
- entityManager.persist(a);
-*/
-
- log.info("UpdateArticeJob.run: END");
- }
-}
diff --git a/src/main/java/io/trygvis/async/AsyncService.java b/src/main/java/io/trygvis/async/AsyncService.java
index e90a0e4..57c1af8 100755
--- a/src/main/java/io/trygvis/async/AsyncService.java
+++ b/src/main/java/io/trygvis/async/AsyncService.java
@@ -2,8 +2,8 @@ package io.trygvis.async;
import io.trygvis.queue.Queue;
import io.trygvis.queue.Task;
-import org.quartz.SchedulerException;
+import java.sql.SQLException;
import java.util.List;
/**
@@ -16,9 +16,8 @@ public interface AsyncService {
* @param interval how often the queue should be polled for missed tasks in seconds.
* @param callable
* @return
- * @throws SchedulerException
*/
- Queue registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException;
+ Queue registerQueue(final String name, final int interval, AsyncCallable callable);
Queue getQueue(String name);
diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java
index 4e78a37..c34330e 100644
--- a/src/main/java/io/trygvis/async/JdbcAsyncService.java
+++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java
@@ -4,88 +4,66 @@ import io.trygvis.queue.Queue;
import io.trygvis.queue.QueueDao;
import io.trygvis.queue.Task;
import io.trygvis.queue.TaskDao;
-import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-import org.springframework.transaction.annotation.Transactional;
-import org.springframework.transaction.support.TransactionSynchronization;
-import org.springframework.transaction.support.TransactionSynchronizationAdapter;
-import org.springframework.transaction.support.TransactionTemplate;
+import java.sql.Connection;
+import java.sql.SQLException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.sleep;
import static java.util.Arrays.asList;
import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.springframework.transaction.annotation.Propagation.REQUIRED;
-import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization;
-@Component
-public class JdbcAsyncService implements AsyncService {
+public class JdbcAsyncService {
private final Logger log = LoggerFactory.getLogger(getClass());
- private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory());
-
private final Map<String, QueueThread> queues = new HashMap<>();
- @Autowired
- private TransactionTemplate transactionTemplate;
-
- @Autowired
- private QueueDao queueDao;
-
- @Autowired
- private TaskDao taskDao;
+ public Queue registerQueue(Connection c, SqlEffectExecutor sqlEffectExecutor, final String name, final int interval, AsyncService.AsyncCallable callable) throws SQLException {
+ QueueDao queueDao = new QueueDao(c);
- @Transactional(propagation = REQUIRED)
- public Queue registerQueue(final String name, final int interval, AsyncCallable callable) throws SchedulerException {
log.info("registerQueue: ENTER");
Queue q = queueDao.findByName(name);
log.info("q = {}", q);
- final long interval_;
if (q == null) {
q = new Queue(name, interval);
queueDao.insert(q);
- interval_ = interval;
- } else {
- // Found an existing queue. Use the Settings from the database.
- interval_ = q.interval;
}
- final QueueThread queueThread = new QueueThread(q, taskDao, transactionTemplate, callable);
+ final QueueThread queueThread = new QueueThread(sqlEffectExecutor, callable, q);
queues.put(name, queueThread);
- registerSynchronization(new TransactionSynchronizationAdapter() {
- public void afterCompletion(int status) {
- log.info("Transaction completed with status = {}", status);
- if (status == TransactionSynchronization.STATUS_COMMITTED) {
- log.info("Starting thread for queue {} with poll interval = {}s", name, interval);
- executor.scheduleAtFixedRate(new Runnable() {
- public void run() {
- queueThread.ping();
- }
- }, 10, interval_, SECONDS);
- Thread thread = new Thread(queueThread, name);
- thread.setDaemon(true);
- thread.start();
- }
- }
- });
-
log.info("registerQueue: LEAVE");
return q;
}
+ public void startQueue(ScheduledThreadPoolExecutor executor, String name) {
+ final QueueThread queueThread = queues.get(name);
+
+ if (queueThread == null) {
+ throw new RuntimeException("No such queue: " + name);
+ }
+
+ long interval = queueThread.queue.interval;
+ log.info("Starting thread for queue {} with poll interval = {}s", name, interval);
+ executor.scheduleAtFixedRate(new Runnable() {
+ public void run() {
+ queueThread.ping();
+ }
+ }, 10, interval, SECONDS);
+ Thread thread = new Thread(queueThread, name);
+ thread.setDaemon(true);
+ thread.start();
+ }
+
public Queue getQueue(String name) {
QueueThread queueThread = queues.get(name);
@@ -96,16 +74,17 @@ public class JdbcAsyncService implements AsyncService {
return queueThread.queue;
}
- @Transactional(propagation = REQUIRED)
- public Task schedule(final Queue queue, String... args) {
- return scheduleInner(null, queue, args);
+ public Task schedule(Connection c, final Queue queue, String... args) throws SQLException {
+ return scheduleInner(c, null, queue, args);
}
- public Task schedule(long parent, Queue queue, String... args) {
- return scheduleInner(parent, queue, args);
+ public Task schedule(Connection c, long parent, Queue queue, String... args) throws SQLException {
+ return scheduleInner(c, parent, queue, args);
}
- private Task scheduleInner(Long parent, final Queue queue, String... args) {
+ private Task scheduleInner(Connection c, Long parent, final Queue queue, String... args) throws SQLException {
+ TaskDao taskDao = new TaskDao(c);
+
Date scheduled = new Date();
StringBuilder arguments = new StringBuilder();
@@ -114,27 +93,22 @@ public class JdbcAsyncService implements AsyncService {
}
long id = taskDao.insert(parent, queue.name, scheduled, arguments.toString());
- Task task = new Task(parent, id, queue.name, scheduled, null, 0, null, asList(args));
+ Task task = new Task(id, parent, queue.name, scheduled, null, 0, null, asList(args));
log.info("Created task = {}", task);
- registerSynchronization(new TransactionSynchronizationAdapter() {
- public void afterCompletion(int status) {
- if (status == TransactionSynchronization.STATUS_COMMITTED) {
- queues.get(queue.name).ping();
- }
- }
- });
-
return task;
}
- @Transactional
- public Task await(Task task, long timeout) {
+ public Task await(Connection c, Task task, long timeout) throws SQLException {
final long start = currentTimeMillis();
final long end = start + timeout;
while (currentTimeMillis() < end) {
- task = update(task);
+ task = update(c, task);
+
+ if (task == null) {
+ throw new RuntimeException("The task went away.");
+ }
try {
sleep(100);
@@ -146,8 +120,9 @@ public class JdbcAsyncService implements AsyncService {
return task;
}
- @Transactional(readOnly = true)
- public Task update(Task ref) {
+ public Task update(Connection c, Task ref) throws SQLException {
+ TaskDao taskDao = new TaskDao(c);
+
return taskDao.findById(ref.id);
}
}
diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java
index 69466df..00c46b4 100644
--- a/src/main/java/io/trygvis/async/QueueThread.java
+++ b/src/main/java/io/trygvis/async/QueueThread.java
@@ -5,37 +5,33 @@ import io.trygvis.queue.Task;
import io.trygvis.queue.TaskDao;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.transaction.TransactionException;
-import org.springframework.transaction.TransactionStatus;
-import org.springframework.transaction.support.TransactionCallback;
-import org.springframework.transaction.support.TransactionCallbackWithoutResult;
-import org.springframework.transaction.support.TransactionTemplate;
+import java.sql.Connection;
+import java.sql.SQLException;
import java.util.Date;
import java.util.List;
+import static io.trygvis.async.SqlEffectExecutor.SqlExecutionException;
+
class QueueThread implements Runnable {
private final Logger log = LoggerFactory.getLogger(getClass());
- public boolean shouldRun = true;
-
- private boolean checkForNewTasks;
+ private final SqlEffectExecutor sqlEffectExecutor;
- private boolean busy;
+ private final AsyncService.AsyncCallable callable;
public final Queue queue;
- private final TaskDao taskDao;
+ public boolean shouldRun = true;
- private final TransactionTemplate transactionTemplate;
+ private boolean checkForNewTasks;
- private final AsyncService.AsyncCallable callable;
+ private boolean busy;
- QueueThread(Queue queue, TaskDao taskDao, TransactionTemplate transactionTemplate, AsyncService.AsyncCallable callable) {
- this.queue = queue;
- this.taskDao = taskDao;
- this.transactionTemplate = transactionTemplate;
+ QueueThread(SqlEffectExecutor sqlEffectExecutor, AsyncService.AsyncCallable callable, Queue queue) {
+ this.sqlEffectExecutor = sqlEffectExecutor;
this.callable = callable;
+ this.queue = queue;
}
public void ping() {
@@ -52,19 +48,25 @@ class QueueThread implements Runnable {
public void run() {
while (shouldRun) {
try {
- List<Task> tasks = transactionTemplate.execute(new TransactionCallback<List<Task>>() {
- public List<Task> doInTransaction(TransactionStatus status) {
- return taskDao.findByNameAndCompletedIsNull(queue.name);
+// List<Task> tasks = transactionTemplate.execute(new TransactionCallback<List<Task>>() {
+// public List<Task> doInTransaction(TransactionStatus status) {
+// return taskDao.findByNameAndCompletedIsNull(queue.name);
+// }
+// });
+ List<Task> tasks = sqlEffectExecutor.execute(new SqlEffect<List<Task>>() {
+ @Override
+ public List<Task> doInConnection(Connection connection) throws SQLException {
+ return new TaskDao(connection).findByNameAndCompletedIsNull(queue.name);
}
});
log.info("Found {} tasks on queue {}", tasks.size(), queue.name);
- if(tasks.size() > 0) {
+ if (tasks.size() > 0) {
for (final Task task : tasks) {
try {
executeTask(task);
- } catch (TransactionException | TaskFailureException e) {
+ } catch (SqlExecutionException | TaskFailureException e) {
log.warn("Task execution failed", e);
}
}
@@ -96,24 +98,44 @@ class QueueThread implements Runnable {
private void executeTask(final Task task) {
final Date run = new Date();
log.info("Setting last run on task. date = {}, task = {}", run, task);
- transactionTemplate.execute(new TransactionCallbackWithoutResult() {
- protected void doInTransactionWithoutResult(TransactionStatus status) {
- taskDao.update(task.registerRun());
+ sqlEffectExecutor.execute(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection connection) throws SQLException {
+ new TaskDao(connection).update(task.registerRun());
}
});
-
- transactionTemplate.execute(new TransactionCallbackWithoutResult() {
- protected void doInTransactionWithoutResult(TransactionStatus status) {
+// transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+// protected void doInTransactionWithoutResult(TransactionStatus status) {
+// taskDao.update(task.registerRun());
+// }
+// });
+
+ sqlEffectExecutor.execute(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
try {
callable.run(task.arguments);
Date completed = new Date();
Task t = task.registerComplete(completed);
log.info("Completed task: {}", t);
- taskDao.update(t);
+ new TaskDao(c).update(t);
} catch (Exception e) {
throw new TaskFailureException(e);
}
}
});
+// transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+// protected void doInTransactionWithoutResult(TransactionStatus status) {
+// try {
+// callable.run(task.arguments);
+// Date completed = new Date();
+// Task t = task.registerComplete(completed);
+// log.info("Completed task: {}", t);
+// taskDao.update(t);
+// } catch (Exception e) {
+// throw new TaskFailureException(e);
+// }
+// }
+// });
}
}
diff --git a/src/main/java/io/trygvis/async/SqlEffect.java b/src/main/java/io/trygvis/async/SqlEffect.java
new file mode 100644
index 0000000..d0c4e9b
--- /dev/null
+++ b/src/main/java/io/trygvis/async/SqlEffect.java
@@ -0,0 +1,12 @@
+package io.trygvis.async;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+public interface SqlEffect<A> {
+ A doInConnection(Connection c) throws SQLException;
+
+ interface Void {
+ void doInConnection(Connection c) throws SQLException;
+ }
+}
diff --git a/src/main/java/io/trygvis/async/SqlEffectExecutor.java b/src/main/java/io/trygvis/async/SqlEffectExecutor.java
new file mode 100644
index 0000000..c8abbd3
--- /dev/null
+++ b/src/main/java/io/trygvis/async/SqlEffectExecutor.java
@@ -0,0 +1,39 @@
+package io.trygvis.async;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+public class SqlEffectExecutor {
+
+ private final DataSource dataSource;
+
+ public SqlEffectExecutor(DataSource dataSource) {
+ this.dataSource = dataSource;
+ }
+
+ public <A> A execute(SqlEffect<A> effect) {
+ try (Connection c = dataSource.getConnection()) {
+ return effect.doInConnection(c);
+ } catch (SQLException e) {
+ throw new SqlExecutionException(e);
+ }
+ }
+
+ public void execute(SqlEffect.Void effect) {
+ try (Connection c = dataSource.getConnection()) {
+ effect.doInConnection(c);
+ } catch (SQLException e) {
+ throw new SqlExecutionException(e);
+ }
+ }
+
+ public static class SqlExecutionException extends RuntimeException {
+ public final SQLException exception;
+
+ public SqlExecutionException(SQLException ex) {
+ super(ex);
+ this.exception = ex;
+ }
+ }
+}
diff --git a/src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java b/src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java
new file mode 100644
index 0000000..8517c68
--- /dev/null
+++ b/src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java
@@ -0,0 +1,102 @@
+package io.trygvis.async.spring;
+
+import io.trygvis.async.AsyncService;
+import io.trygvis.async.JdbcAsyncService;
+import io.trygvis.async.SqlEffectExecutor;
+import io.trygvis.queue.Queue;
+import io.trygvis.queue.Task;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.jdbc.core.ConnectionCallback;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.transaction.annotation.Transactional;
+import org.springframework.transaction.support.TransactionSynchronization;
+import org.springframework.transaction.support.TransactionSynchronizationAdapter;
+import org.springframework.transaction.support.TransactionTemplate;
+
+import javax.annotation.PostConstruct;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import static org.springframework.transaction.annotation.Propagation.REQUIRED;
+import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization;
+
+public class SpringJdbcAsyncService implements AsyncService {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory());
+
+ private final TransactionTemplate transactionTemplate;
+
+ private final JdbcTemplate jdbcTemplate;
+
+ private SqlEffectExecutor sqlEffectExecutor;
+
+ final JdbcAsyncService jdbcAsyncService;
+
+ public SpringJdbcAsyncService(TransactionTemplate transactionTemplate, JdbcTemplate jdbcTemplate) {
+ this.transactionTemplate = transactionTemplate;
+ this.jdbcTemplate = jdbcTemplate;
+ jdbcAsyncService = new JdbcAsyncService();
+ sqlEffectExecutor = new SqlEffectExecutor(this.jdbcTemplate.getDataSource());
+ }
+
+ @Transactional(propagation = REQUIRED)
+ public Queue registerQueue(final String name, final int interval, final AsyncService.AsyncCallable callable) {
+ return jdbcTemplate.execute(new ConnectionCallback<Queue>() {
+ @Override
+ public Queue doInConnection(Connection c) throws SQLException {
+
+ Queue q = jdbcAsyncService.registerQueue(c, sqlEffectExecutor, name, interval, callable);
+
+ registerSynchronization(new TransactionSynchronizationAdapter() {
+ public void afterCompletion(int status) {
+ log.info("Transaction completed with status = {}", status);
+ if (status == TransactionSynchronization.STATUS_COMMITTED) {
+ jdbcAsyncService.startQueue(executor, name);
+ }
+ }
+ });
+
+ log.info("registerQueue: LEAVE");
+ return q;
+ }
+ });
+ }
+
+ public Queue getQueue(String name) {
+ return jdbcAsyncService.getQueue(name);
+ }
+
+ @Transactional(propagation = REQUIRED)
+ public Task schedule(final Queue queue, final String... args) {
+ return jdbcTemplate.execute(new ConnectionCallback<Task>() {
+ @Override
+ public Task doInConnection(Connection c) throws SQLException {
+ return jdbcAsyncService.schedule(c, queue, args);
+ }
+ });
+ }
+
+ public Task schedule(final long parent, final Queue queue, final String... args) {
+ return jdbcTemplate.execute(new ConnectionCallback<Task>() {
+ @Override
+ public Task doInConnection(Connection c) throws SQLException {
+ return jdbcAsyncService.schedule(c, parent, queue, args);
+ }
+ });
+ }
+
+ @Transactional(readOnly = true)
+ public Task update(final Task ref) {
+ return jdbcTemplate.execute(new ConnectionCallback<Task>() {
+ @Override
+ public Task doInConnection(Connection c) throws SQLException {
+ return jdbcAsyncService.update(c, ref);
+ }
+ });
+ }
+}
diff --git a/src/main/java/io/trygvis/model/Article.java b/src/main/java/io/trygvis/model/Article.java
deleted file mode 100755
index e86c570..0000000
--- a/src/main/java/io/trygvis/model/Article.java
+++ /dev/null
@@ -1,55 +0,0 @@
-package io.trygvis.model;
-
-import javax.persistence.Entity;
-import javax.persistence.GeneratedValue;
-import javax.persistence.GenerationType;
-import javax.persistence.Id;
-import javax.persistence.SequenceGenerator;
-import java.util.Date;
-
-@Entity
-public class Article {
- @Id
- @SequenceGenerator(name="id_seq", sequenceName="id_seq")
- @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "id_seq")
- private Integer id;
- private Date created;
- private Date updated;
- private String title;
- private String body;
-
- @SuppressWarnings("UnusedDeclaration")
- private Article() {
- }
-
- public Article(Date created, Date updated, String title, String body) {
- this.created = created;
- this.updated = updated;
- this.title = title;
- this.body = body;
- }
-
- public Integer getId() {
- return id;
- }
-
- public Date getCreated() {
- return created;
- }
-
- public Date getUpdated() {
- return updated;
- }
-
- public void setUpdated(Date updated) {
- this.updated = updated;
- }
-
- public String getTitle() {
- return title;
- }
-
- public String getBody() {
- return body;
- }
-}
diff --git a/src/main/java/io/trygvis/queue/QueueDao.java b/src/main/java/io/trygvis/queue/QueueDao.java
index 63dde2a..2f69e11 100644
--- a/src/main/java/io/trygvis/queue/QueueDao.java
+++ b/src/main/java/io/trygvis/queue/QueueDao.java
@@ -1,36 +1,45 @@
package io.trygvis.queue;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.jdbc.core.JdbcTemplate;
-import org.springframework.jdbc.core.RowMapper;
-import org.springframework.stereotype.Component;
-
+import java.sql.Connection;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
-import static org.springframework.dao.support.DataAccessUtils.singleResult;
-
-@Component
public class QueueDao {
- @Autowired
- private JdbcTemplate jdbcTemplate;
+ private final Connection connection;
- public Queue findByName(String name) {
- return singleResult(jdbcTemplate.query("SELECT name, interval FROM queue WHERE name=?", new QueueRowMapper(), name));
+ public QueueDao(Connection connection) {
+ this.connection = connection;
}
- public void insert(Queue q) {
- jdbcTemplate.update("INSERT INTO queue(name, interval) VALUES(?, ?)", q.name, q.interval);
+ public Queue findByName(String name) throws SQLException {
+ try (PreparedStatement stmt = connection.prepareStatement("SELECT name, interval FROM queue WHERE name=?")) {
+ stmt.setString(1, name);
+ ResultSet rs = stmt.executeQuery();
+ return rs.next() ? mapRow(rs) : null;
+ }
}
- public void update(Queue q) {
- jdbcTemplate.update("UPDATE queue SET interval=? WHERE name=?", q.interval, q.name);
+ public void insert(Queue q) throws SQLException {
+ try (PreparedStatement stmt = connection.prepareStatement("INSERT INTO queue(name, interval) VALUES(?, ?)")) {
+ int i = 1;
+ stmt.setString(i++, q.name);
+ stmt.setLong(i, q.interval);
+ stmt.executeUpdate();
+ }
}
- private class QueueRowMapper implements RowMapper<Queue> {
- public Queue mapRow(ResultSet rs, int rowNum) throws SQLException {
- return new Queue(rs.getString(1), rs.getLong(2));
+ public void update(Queue q) throws SQLException {
+ try (PreparedStatement stmt = connection.prepareStatement("UPDATE queue SET interval=? WHERE name=?")) {
+ int i = 1;
+ stmt.setLong(i++, q.interval);
+ stmt.setString(i, q.name);
+ stmt.executeUpdate();
}
}
+
+ public Queue mapRow(ResultSet rs) throws SQLException {
+ return new Queue(rs.getString(1), rs.getLong(2));
+ }
}
diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java
index a59dcbb..5459933 100644
--- a/src/main/java/io/trygvis/queue/TaskDao.java
+++ b/src/main/java/io/trygvis/queue/TaskDao.java
@@ -1,69 +1,104 @@
package io.trygvis.queue;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.jdbc.core.JdbcTemplate;
-import org.springframework.jdbc.core.RowMapper;
-import org.springframework.stereotype.Component;
-import org.springframework.transaction.annotation.Transactional;
-
+import java.sql.Connection;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import static java.util.Arrays.asList;
-import static org.springframework.transaction.annotation.Propagation.MANDATORY;
-@Component
public class TaskDao {
- @Autowired
- private JdbcTemplate jdbcTemplate;
+ private final Connection connection;
+
+ public static final String fields = "id, parent, queue, scheduled, last_run, run_count, completed, arguments";
- public long insert(String queue, Date scheduled, String arguments) {
- return this.insert(null, queue, scheduled, arguments);
+ public TaskDao(Connection connection) {
+ this.connection = connection;
}
- @Transactional(propagation = MANDATORY)
- public long insert(Long parent, String queue, Date scheduled, String arguments) {
- jdbcTemplate.update("INSERT INTO task(id, parent, run_count, queue, scheduled, arguments) " +
- "VALUES(nextval('task_seq'), ?, 0, ?, ?, ?)", parent, queue, scheduled, arguments);
- return jdbcTemplate.queryForObject("SELECT currval('task_seq')", Long.class);
+ public long insert(String queue, Date scheduled, String arguments) throws SQLException {
+ return insert(null, queue, scheduled, arguments);
}
- @Transactional(propagation = MANDATORY)
- public Task findById(long id) {
- return jdbcTemplate.queryForObject("SELECT " + TaskRowMapper.fields + " FROM task WHERE id=?",
- new TaskRowMapper(), id);
+ public long insert(Long parent, String queue, Date scheduled, String arguments) throws SQLException {
+ String sql = "INSERT INTO task(id, parent, run_count, queue, scheduled, arguments) " +
+ "VALUES(nextval('task_seq'), ?, 0, ?, ?, ?)";
+ try (PreparedStatement stmt = connection.prepareStatement(sql)) {
+ int i = 1;
+ if (parent == null) {
+ stmt.setNull(i++, Types.BIGINT);
+ } else {
+ stmt.setLong(i++, parent);
+ }
+ stmt.setString(i++, queue);
+ stmt.setTimestamp(i++, new Timestamp(scheduled.getTime()));
+ stmt.setString(i, arguments);
+ stmt.executeUpdate();
+ }
+ try (PreparedStatement stmt = connection.prepareStatement("SELECT currval('task_seq')")) {
+ ResultSet rs = stmt.executeQuery();
+ rs.next();
+ return rs.getLong(1);
+ }
}
- @Transactional(propagation = MANDATORY)
- public List<Task> findByNameAndCompletedIsNull(String name) {
- return jdbcTemplate.query("SELECT " + TaskRowMapper.fields + " FROM task WHERE queue=? AND completed IS NULL",
- new TaskRowMapper(), name);
+ public Task findById(long id) throws SQLException {
+ try (PreparedStatement stmt = connection.prepareStatement("SELECT " + fields + " FROM task WHERE id=?")) {
+ ResultSet rs = stmt.executeQuery();
+ return rs.next() ? mapRow(rs) : null;
+ }
}
- @Transactional(propagation = MANDATORY)
- public void update(Task task) {
- jdbcTemplate.update("UPDATE task SET scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?",
- task.scheduled, task.lastRun, task.runCount, task.completed, task.id);
+ public List<Task> findByNameAndCompletedIsNull(String name) throws SQLException {
+ try (PreparedStatement stmt = connection.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND completed IS NULL")) {
+ int i = 1;
+ stmt.setString(i, name);
+ ResultSet rs = stmt.executeQuery();
+ List<Task> list = new ArrayList<>();
+ while (rs.next()) {
+ list.add(mapRow(rs));
+ }
+ return list;
+ }
}
- private class TaskRowMapper implements RowMapper<Task> {
- public static final String fields = "id, parent, queue, scheduled, last_run, run_count, completed, arguments";
+ public void update(Task task) throws SQLException {
+ try (PreparedStatement stmt = connection.prepareStatement("UPDATE task SET scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?")) {
+ int i = 1;
+ stmt.setTimestamp(i++, new Timestamp(task.scheduled.getTime()));
+ setTimestamp(stmt, i++, task.lastRun);
+ stmt.setInt(i++, task.runCount);
+ setTimestamp(stmt, i++, task.completed);
+ stmt.setLong(i, task.id);
+ stmt.executeUpdate();
+ }
+ }
- public Task mapRow(ResultSet rs, int rowNum) throws SQLException {
- String arguments = rs.getString(8);
- return new Task(
- rs.getLong(1),
- rs.getLong(2),
- rs.getString(3),
- rs.getTimestamp(4),
- rs.getTimestamp(5),
- rs.getInt(6),
- rs.getTimestamp(7),
- arguments != null ? asList(arguments.split(" ")) : Collections.<String>emptyList());
+ private static void setTimestamp(PreparedStatement stmt, int parameterIndex, Date date) throws SQLException {
+ if (date == null) {
+ stmt.setNull(parameterIndex, Types.TIMESTAMP);
+ } else {
+ stmt.setTimestamp(parameterIndex, new Timestamp(date.getTime()));
}
}
+
+ public Task mapRow(ResultSet rs) throws SQLException {
+ String arguments = rs.getString(8);
+ return new Task(
+ rs.getLong(1),
+ rs.getLong(2),
+ rs.getString(3),
+ rs.getTimestamp(4),
+ rs.getTimestamp(5),
+ rs.getInt(6),
+ rs.getTimestamp(7),
+ arguments != null ? asList(arguments.split(" ")) : Collections.<String>emptyList());
+ }
}
diff --git a/src/main/java/io/trygvis/spring/Config.java b/src/main/java/io/trygvis/spring/Config.java
deleted file mode 100755
index df4b2e2..0000000
--- a/src/main/java/io/trygvis/spring/Config.java
+++ /dev/null
@@ -1,172 +0,0 @@
-package io.trygvis.spring;
-
-import com.jolbox.bonecp.BoneCPDataSource;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
-import org.springframework.jdbc.core.JdbcTemplate;
-import org.springframework.jdbc.datasource.DataSourceTransactionManager;
-import org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy;
-import org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy;
-import org.springframework.transaction.PlatformTransactionManager;
-import org.springframework.transaction.annotation.EnableTransactionManagement;
-import org.springframework.transaction.support.TransactionTemplate;
-
-import javax.sql.DataSource;
-
-@Configuration
-@ComponentScan(basePackages = "io.trygvis")
-@EnableTransactionManagement
-//@EnableJpaRepositories(basePackages = "io.trygvis.data")
-public class Config {
-
- @Bean
- public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() throws Exception {
- return new PropertySourcesPlaceholderConfigurer() {{
-// setLocation(new UrlResource("file:environment.properties"));
- setProperties(System.getProperties());
- setLocalOverride(true);
- }};
- }
-
- @Bean
- public JdbcTemplate jdbcTemplate(DataSource dataSource) {
- return new JdbcTemplate(dataSource);
- }
-
-// public SpringBeanJobFactory springBeanJobFactory() {
-// SpringBeanJobFactory factory = new SpringBeanJobFactory();
-// return factory;
-// }
-
-/*
- @Bean
- public SchedulerFactoryBean quartz(DataSource dataSource, PlatformTransactionManager transactionManager) {
- SchedulerFactoryBean bean = new SchedulerFactoryBean();
- bean.setApplicationContextSchedulerContextKey("applicationContext");
- bean.setDataSource(dataSource);
- bean.setTransactionManager(transactionManager);
-// bean.setJobFactory(new JobFactory() {
-// public Job newJob(TriggerFiredBundle bundle, Scheduler scheduler) throws SchedulerException {
-// Class<? extends Job> klass = bundle.getJobDetail().getJobClass();
-// }
-// });
-
- Properties quartzProperties = new Properties();
- quartzProperties.setProperty("org.quartz.scheduler.skipUpdateCheck", "true");
-// quartzProperties.setProperty("org.quartz.jobStore.selectWithLockSQL", "false");
- quartzProperties.setProperty("org.quartz.jobStore.driverDelegateClass", PostgreSQLDelegate.class.getName());
- quartzProperties.setProperty("org.quartz.scheduler.jmx.export", "true");
- bean.setQuartzProperties(quartzProperties);
- return bean;
- }
-*/
-
- // This turns out to be fairly useless as Spring won't register them automatically.
- // It's probably better to use @Scheduled/@Async instead
- /*
- @Bean(name = "my-job")
- public JobDetailFactoryBean myJobDetailBean() {
- JobDetailFactoryBean bean = new JobDetailFactoryBean();
- bean.setJobClass(MyJob.class);
- bean.setDurability(true);
-
- return bean;
- }
-
- @Bean
- public CronTriggerFactoryBean myJobTrigger(JobDetail jobDetail) {
- CronTriggerFactoryBean bean = new CronTriggerFactoryBean();
- bean.setName("my-trigger");
- bean.setBeanName("my-job");
- bean.setJobDetail(jobDetail);
- bean.setCronExpression("0/10 * * * * ?");
-
- return bean;
- }
- */
-
- @Bean
- public DataSource dataSource(@Value("${database.url}") String jdbcUrl,
- @Value("${database.username}") String username,
- @Value("${database.password}") String password) {
- BoneCPDataSource ds = new BoneCPDataSource();
-
- ds.setLogStatementsEnabled(true);
-
- ds.setJdbcUrl(jdbcUrl);
- ds.setUsername(username);
- ds.setPassword(password);
-
- ds.setIdleConnectionTestPeriodInSeconds(60);
- ds.setIdleMaxAgeInSeconds(240);
- ds.setMaxConnectionsPerPartition(40);
- ds.setMinConnectionsPerPartition(0);
- ds.setPartitionCount(1);
- ds.setAcquireIncrement(1);
- ds.setStatementsCacheSize(1000);
- ds.setReleaseHelperThreads(3);
- ds.setStatisticsEnabled(true);
- return new TransactionAwareDataSourceProxy(new LazyConnectionDataSourceProxy(ds));
- }
-
-/*
- @Bean
- public LocalContainerEntityManagerFactoryBean entityManagerFactory(DataSource dataSource,
- @Value("${hibernate.hbm2ddl.auto}") String hbm2ddl,
- @Value("${hibernate.showSql:false}") boolean showSql,
- @Value("${hibernate.dialect}") String dialect) {
- LocalContainerEntityManagerFactoryBean x = new LocalContainerEntityManagerFactoryBean();
- x.setDataSource(dataSource);
- x.setJpaPropertyMap(createJpaMap(hbm2ddl, showSql, dialect));
- x.setPackagesToScan(Article.class.getPackage().getName());
- HibernatePersistence persistenceProvider = new HibernatePersistence();
- x.setPersistenceProvider(persistenceProvider);
- return x;
- }
-
- public static Map<String, Object> createJpaMap(String hbm2ddl, boolean showSql, String dialect) {
- Map<String, Object> map = new HashMap<>();
- map.put(HBM2DDL_AUTO, hbm2ddl);
- map.put(FORMAT_SQL, showSql);
- map.put(SHOW_SQL, showSql);
- map.put(USE_SQL_COMMENTS, showSql);
- map.put(GENERATE_STATISTICS, true);
- map.put(NAMING_STRATEGY, ImprovedNamingStrategy.class.getName());
-
- map.put(DEFAULT_CACHE_CONCURRENCY_STRATEGY, CacheConcurrencyStrategy.READ_WRITE.toString());
- map.put(CURRENT_SESSION_CONTEXT_CLASS, SpringSessionContext.class.getName());
-// map.put(CACHE_REGION_FACTORY, EhCacheRegionFactory.class.getName());
- map.put(USE_SECOND_LEVEL_CACHE, false);
- map.put(USE_QUERY_CACHE, false);
-// map.put(SHARED_CACHE_MODE, SharedCacheMode.ENABLE_SELECTIVE.toString());
-
- map.put(DIALECT, dialect);
- map.put("hibernate.temp.use_jdbc_metadata_defaults", "false");
-
- return map;
- }
-
- @Bean
- public SessionFactory sessionFactory(LocalContainerEntityManagerFactoryBean entityManagerFactory) {
- return ((HibernateEntityManagerFactory) entityManagerFactory.nativeEntityManagerFactory).getSessionFactory();
- }
-
- @Bean
- public JpaTransactionManager transactionManager(EntityManagerFactory entityManagerFactory) {
- return new JpaTransactionManager(entityManagerFactory);
- }
-*/
-
- @Bean
- public PlatformTransactionManager transactionManager(DataSource dataSource) {
- return new DataSourceTransactionManager(dataSource);
- }
-
- @Bean
- public TransactionTemplate transactionTemplate(PlatformTransactionManager platformTransactionManager) {
- return new TransactionTemplate(platformTransactionManager);
- }
-}
diff --git a/src/main/java/io/trygvis/spring/DefaultConfig.java b/src/main/java/io/trygvis/spring/DefaultConfig.java
new file mode 100644
index 0000000..af8f644
--- /dev/null
+++ b/src/main/java/io/trygvis/spring/DefaultConfig.java
@@ -0,0 +1,17 @@
+package io.trygvis.spring;
+
+import io.trygvis.async.AsyncService;
+import io.trygvis.async.spring.SpringJdbcAsyncService;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.transaction.support.TransactionTemplate;
+
+@Configuration
+public class DefaultConfig {
+
+ @Bean
+ public AsyncService asyncService(TransactionTemplate transactionTemplate, JdbcTemplate jdbcTemplate) {
+ return new SpringJdbcAsyncService(transactionTemplate, jdbcTemplate);
+ }
+}