From 52084f7b4e6f50c90b3255cdf2eb9deab560c970 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sun, 2 Jun 2013 12:32:29 +0200 Subject: o Making some test cases. --- .../java/io/trygvis/CreateArticleCallable.java | 42 ----- src/main/java/io/trygvis/Main.java | 117 -------------- .../java/io/trygvis/UpdateArticleCallable.java | 44 ------ src/main/java/io/trygvis/async/AsyncService.java | 5 +- .../java/io/trygvis/async/JdbcAsyncService.java | 109 +++++-------- src/main/java/io/trygvis/async/QueueThread.java | 78 ++++++---- src/main/java/io/trygvis/async/SqlEffect.java | 12 ++ .../java/io/trygvis/async/SqlEffectExecutor.java | 39 +++++ .../async/spring/SpringJdbcAsyncService.java | 102 ++++++++++++ src/main/java/io/trygvis/model/Article.java | 55 ------- src/main/java/io/trygvis/queue/QueueDao.java | 47 +++--- src/main/java/io/trygvis/queue/TaskDao.java | 119 +++++++++----- src/main/java/io/trygvis/spring/Config.java | 172 --------------------- src/main/java/io/trygvis/spring/DefaultConfig.java | 17 ++ src/main/resources/applicationContext.xml | 22 --- src/main/resources/create-postgresql.sql | 29 ++++ src/main/resources/create.sql | 29 ---- src/test/java/io/trygvis/test/Article.java | 55 +++++++ .../io/trygvis/test/CreateArticleCallable.java | 41 +++++ src/test/java/io/trygvis/test/Main.java | 115 ++++++++++++++ .../io/trygvis/test/UpdateArticleCallable.java | 44 ++++++ .../io/trygvis/test/spring/PlainSpringTest.java | 59 +++++++ .../java/io/trygvis/test/spring/TestConfig.java | 172 +++++++++++++++++++++ src/test/resources/applicationContext.xml | 22 +++ 24 files changed, 906 insertions(+), 640 deletions(-) delete mode 100755 src/main/java/io/trygvis/CreateArticleCallable.java delete mode 100755 src/main/java/io/trygvis/Main.java delete mode 100755 src/main/java/io/trygvis/UpdateArticleCallable.java create mode 100644 src/main/java/io/trygvis/async/SqlEffect.java create mode 100644 src/main/java/io/trygvis/async/SqlEffectExecutor.java create mode 100644 src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java delete mode 100755 src/main/java/io/trygvis/model/Article.java delete mode 100755 src/main/java/io/trygvis/spring/Config.java create mode 100644 src/main/java/io/trygvis/spring/DefaultConfig.java delete mode 100755 src/main/resources/applicationContext.xml create mode 100644 src/main/resources/create-postgresql.sql delete mode 100644 src/main/resources/create.sql create mode 100755 src/test/java/io/trygvis/test/Article.java create mode 100755 src/test/java/io/trygvis/test/CreateArticleCallable.java create mode 100755 src/test/java/io/trygvis/test/Main.java create mode 100755 src/test/java/io/trygvis/test/UpdateArticleCallable.java create mode 100644 src/test/java/io/trygvis/test/spring/PlainSpringTest.java create mode 100755 src/test/java/io/trygvis/test/spring/TestConfig.java create mode 100755 src/test/resources/applicationContext.xml (limited to 'src') diff --git a/src/main/java/io/trygvis/CreateArticleCallable.java b/src/main/java/io/trygvis/CreateArticleCallable.java deleted file mode 100755 index 471b59d..0000000 --- a/src/main/java/io/trygvis/CreateArticleCallable.java +++ /dev/null @@ -1,42 +0,0 @@ -package io.trygvis; - -import io.trygvis.model.Article; -import io.trygvis.async.AsyncService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Transactional; - -import java.util.Date; -import java.util.List; -import java.util.Random; - -import static org.springframework.transaction.annotation.Propagation.MANDATORY; - -@Component("createArticle") -@Transactional(propagation = MANDATORY) -public class CreateArticleCallable implements AsyncService.AsyncCallable { - private final Logger log = LoggerFactory.getLogger(getClass()); - -// @PersistenceContext -// private EntityManager entityManager; - - private Random random = new Random(); - - public void run(List arguments) throws Exception { - log.info("CreateArticeJob.run: BEGIN"); - - if (random.nextInt() % 3 == 0) { - throw new RuntimeException("failing create article"); - } - - Date now = new Date(); - - log.info("now = {}", now); - - Article article = new Article(new Date(), null, "title", "body"); -// entityManager.persist(article); - - log.info("CreateArticeJob.run: END"); - } -} diff --git a/src/main/java/io/trygvis/Main.java b/src/main/java/io/trygvis/Main.java deleted file mode 100755 index 08b9b75..0000000 --- a/src/main/java/io/trygvis/Main.java +++ /dev/null @@ -1,117 +0,0 @@ -package io.trygvis; - -import io.trygvis.async.AsyncService; -import io.trygvis.queue.Queue; -import io.trygvis.queue.Task; -import org.hibernate.dialect.PostgreSQL82Dialect; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.slf4j.bridge.SLF4JBridgeHandler; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.context.support.ClassPathXmlApplicationContext; -import org.springframework.stereotype.Component; -import org.springframework.transaction.TransactionStatus; -import org.springframework.transaction.support.TransactionCallbackWithoutResult; -import org.springframework.transaction.support.TransactionTemplate; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -import static java.lang.System.*; -import static java.lang.Thread.sleep; - -@Component -public class Main { - private static final Logger log = LoggerFactory.getLogger(Main.class); - - public static void main(String[] args) throws Exception { - SLF4JBridgeHandler.install(); - - String username = getProperty("user.name"); - setProperty("database.url", getProperty("jdbc.url", "jdbc:postgresql://localhost/" + username)); - setProperty("database.username", username); - setProperty("database.password", username); -// setProperty("hibernate.showSql", "true"); - setProperty("hibernate.hbm2ddl.auto", "create"); // create - setProperty("hibernate.dialect", PostgreSQL82Dialect.class.getName()); - - log.info("Starting context"); - ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml"); - log.info("Started context"); - - try { - context.getBean(Main.class).run(); -// log.info("Sleeping"); -// sleep(1000 * 1000); - } catch (Exception e) { - e.printStackTrace(System.out); - } - - log.info("Stopping context"); - context.stop(); - log.info("Stopped context"); - - exit(0); - } - - @Autowired - private TransactionTemplate transactionTemplate; - - @Autowired - private AsyncService asyncService; - - @Autowired - @Qualifier("createArticle") - private AsyncService.AsyncCallable createArticleCallable; - - @Autowired - @Qualifier("updateArticle") - private AsyncService.AsyncCallable updateArticleCallable; - - public void run() throws Exception { - log.info("Main.run"); - - final Queue q = asyncService.registerQueue("create-article", 1, createArticleCallable); -// log.info("queue registered: ref = {}", q); -// asyncService.registerQueue("update-queue", 1, updateArticleCallable); - -// q = asyncService.getQueue("create-queue"); - - final List tasks = new ArrayList<>(); - - final int count = 1; - log.info("Creating {} tasks", count); - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - protected void doInTransactionWithoutResult(TransactionStatus status) { - for (int i = 0; i < count; i++) { - tasks.add(asyncService.schedule(q)); - } - } - }); - log.info("Created {} tasks", count); - - while (true) { - sleep(10000); - - log.info("Checking for status of {} tasks", tasks.size()); - for (Iterator iterator = tasks.iterator(); iterator.hasNext(); ) { - Task task = iterator.next(); - - task = asyncService.update(task); - -// log.info("task = {}", task); - - if (task.isDone()) { - iterator.remove(); - } - } - - if (tasks.isEmpty()) { - log.info("No more tasks"); - break; - } - } - } -} diff --git a/src/main/java/io/trygvis/UpdateArticleCallable.java b/src/main/java/io/trygvis/UpdateArticleCallable.java deleted file mode 100755 index a910855..0000000 --- a/src/main/java/io/trygvis/UpdateArticleCallable.java +++ /dev/null @@ -1,44 +0,0 @@ -package io.trygvis; - -import io.trygvis.async.AsyncService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; - -import java.util.Date; -import java.util.List; -import java.util.Random; - -@Component("updateArticle") -public class UpdateArticleCallable - implements AsyncService.AsyncCallable { - private final Logger log = LoggerFactory.getLogger(getClass()); - - private final Random r = new Random(); - -// @PersistenceContext -// private EntityManager entityManager; - -// @Transactional(propagation = REQUIRES_NEW) - public void run(List arguments) throws Exception { - log.info("UpdateArticeJob.run: BEGIN"); - - Date now = new Date(); - - log.info("now = {}", now); - -/* - TypedQuery
q = entityManager.createQuery(entityManager.getCriteriaBuilder().createQuery(Article.class)); - - List
list = q.getResultList(); - log.info("Got {} articles", list.size()); - - Article a = list.get(r.nextInt(list.size())); - a.setUpdated(new Date()); - - entityManager.persist(a); -*/ - - log.info("UpdateArticeJob.run: END"); - } -} diff --git a/src/main/java/io/trygvis/async/AsyncService.java b/src/main/java/io/trygvis/async/AsyncService.java index e90a0e4..57c1af8 100755 --- a/src/main/java/io/trygvis/async/AsyncService.java +++ b/src/main/java/io/trygvis/async/AsyncService.java @@ -2,8 +2,8 @@ package io.trygvis.async; import io.trygvis.queue.Queue; import io.trygvis.queue.Task; -import org.quartz.SchedulerException; +import java.sql.SQLException; import java.util.List; /** @@ -16,9 +16,8 @@ public interface AsyncService { * @param interval how often the queue should be polled for missed tasks in seconds. * @param callable * @return - * @throws SchedulerException */ - Queue registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException; + Queue registerQueue(final String name, final int interval, AsyncCallable callable); Queue getQueue(String name); diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java index 4e78a37..c34330e 100644 --- a/src/main/java/io/trygvis/async/JdbcAsyncService.java +++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java @@ -4,88 +4,66 @@ import io.trygvis.queue.Queue; import io.trygvis.queue.QueueDao; import io.trygvis.queue.Task; import io.trygvis.queue.TaskDao; -import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Transactional; -import org.springframework.transaction.support.TransactionSynchronization; -import org.springframework.transaction.support.TransactionSynchronizationAdapter; -import org.springframework.transaction.support.TransactionTemplate; +import java.sql.Connection; +import java.sql.SQLException; import java.util.Date; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledThreadPoolExecutor; import static java.lang.System.currentTimeMillis; import static java.lang.Thread.sleep; import static java.util.Arrays.asList; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.springframework.transaction.annotation.Propagation.REQUIRED; -import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization; -@Component -public class JdbcAsyncService implements AsyncService { +public class JdbcAsyncService { private final Logger log = LoggerFactory.getLogger(getClass()); - private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); - private final Map queues = new HashMap<>(); - @Autowired - private TransactionTemplate transactionTemplate; - - @Autowired - private QueueDao queueDao; - - @Autowired - private TaskDao taskDao; + public Queue registerQueue(Connection c, SqlEffectExecutor sqlEffectExecutor, final String name, final int interval, AsyncService.AsyncCallable callable) throws SQLException { + QueueDao queueDao = new QueueDao(c); - @Transactional(propagation = REQUIRED) - public Queue registerQueue(final String name, final int interval, AsyncCallable callable) throws SchedulerException { log.info("registerQueue: ENTER"); Queue q = queueDao.findByName(name); log.info("q = {}", q); - final long interval_; if (q == null) { q = new Queue(name, interval); queueDao.insert(q); - interval_ = interval; - } else { - // Found an existing queue. Use the Settings from the database. - interval_ = q.interval; } - final QueueThread queueThread = new QueueThread(q, taskDao, transactionTemplate, callable); + final QueueThread queueThread = new QueueThread(sqlEffectExecutor, callable, q); queues.put(name, queueThread); - registerSynchronization(new TransactionSynchronizationAdapter() { - public void afterCompletion(int status) { - log.info("Transaction completed with status = {}", status); - if (status == TransactionSynchronization.STATUS_COMMITTED) { - log.info("Starting thread for queue {} with poll interval = {}s", name, interval); - executor.scheduleAtFixedRate(new Runnable() { - public void run() { - queueThread.ping(); - } - }, 10, interval_, SECONDS); - Thread thread = new Thread(queueThread, name); - thread.setDaemon(true); - thread.start(); - } - } - }); - log.info("registerQueue: LEAVE"); return q; } + public void startQueue(ScheduledThreadPoolExecutor executor, String name) { + final QueueThread queueThread = queues.get(name); + + if (queueThread == null) { + throw new RuntimeException("No such queue: " + name); + } + + long interval = queueThread.queue.interval; + log.info("Starting thread for queue {} with poll interval = {}s", name, interval); + executor.scheduleAtFixedRate(new Runnable() { + public void run() { + queueThread.ping(); + } + }, 10, interval, SECONDS); + Thread thread = new Thread(queueThread, name); + thread.setDaemon(true); + thread.start(); + } + public Queue getQueue(String name) { QueueThread queueThread = queues.get(name); @@ -96,16 +74,17 @@ public class JdbcAsyncService implements AsyncService { return queueThread.queue; } - @Transactional(propagation = REQUIRED) - public Task schedule(final Queue queue, String... args) { - return scheduleInner(null, queue, args); + public Task schedule(Connection c, final Queue queue, String... args) throws SQLException { + return scheduleInner(c, null, queue, args); } - public Task schedule(long parent, Queue queue, String... args) { - return scheduleInner(parent, queue, args); + public Task schedule(Connection c, long parent, Queue queue, String... args) throws SQLException { + return scheduleInner(c, parent, queue, args); } - private Task scheduleInner(Long parent, final Queue queue, String... args) { + private Task scheduleInner(Connection c, Long parent, final Queue queue, String... args) throws SQLException { + TaskDao taskDao = new TaskDao(c); + Date scheduled = new Date(); StringBuilder arguments = new StringBuilder(); @@ -114,27 +93,22 @@ public class JdbcAsyncService implements AsyncService { } long id = taskDao.insert(parent, queue.name, scheduled, arguments.toString()); - Task task = new Task(parent, id, queue.name, scheduled, null, 0, null, asList(args)); + Task task = new Task(id, parent, queue.name, scheduled, null, 0, null, asList(args)); log.info("Created task = {}", task); - registerSynchronization(new TransactionSynchronizationAdapter() { - public void afterCompletion(int status) { - if (status == TransactionSynchronization.STATUS_COMMITTED) { - queues.get(queue.name).ping(); - } - } - }); - return task; } - @Transactional - public Task await(Task task, long timeout) { + public Task await(Connection c, Task task, long timeout) throws SQLException { final long start = currentTimeMillis(); final long end = start + timeout; while (currentTimeMillis() < end) { - task = update(task); + task = update(c, task); + + if (task == null) { + throw new RuntimeException("The task went away."); + } try { sleep(100); @@ -146,8 +120,9 @@ public class JdbcAsyncService implements AsyncService { return task; } - @Transactional(readOnly = true) - public Task update(Task ref) { + public Task update(Connection c, Task ref) throws SQLException { + TaskDao taskDao = new TaskDao(c); + return taskDao.findById(ref.id); } } diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java index 69466df..00c46b4 100644 --- a/src/main/java/io/trygvis/async/QueueThread.java +++ b/src/main/java/io/trygvis/async/QueueThread.java @@ -5,37 +5,33 @@ import io.trygvis.queue.Task; import io.trygvis.queue.TaskDao; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.transaction.TransactionException; -import org.springframework.transaction.TransactionStatus; -import org.springframework.transaction.support.TransactionCallback; -import org.springframework.transaction.support.TransactionCallbackWithoutResult; -import org.springframework.transaction.support.TransactionTemplate; +import java.sql.Connection; +import java.sql.SQLException; import java.util.Date; import java.util.List; +import static io.trygvis.async.SqlEffectExecutor.SqlExecutionException; + class QueueThread implements Runnable { private final Logger log = LoggerFactory.getLogger(getClass()); - public boolean shouldRun = true; - - private boolean checkForNewTasks; + private final SqlEffectExecutor sqlEffectExecutor; - private boolean busy; + private final AsyncService.AsyncCallable callable; public final Queue queue; - private final TaskDao taskDao; + public boolean shouldRun = true; - private final TransactionTemplate transactionTemplate; + private boolean checkForNewTasks; - private final AsyncService.AsyncCallable callable; + private boolean busy; - QueueThread(Queue queue, TaskDao taskDao, TransactionTemplate transactionTemplate, AsyncService.AsyncCallable callable) { - this.queue = queue; - this.taskDao = taskDao; - this.transactionTemplate = transactionTemplate; + QueueThread(SqlEffectExecutor sqlEffectExecutor, AsyncService.AsyncCallable callable, Queue queue) { + this.sqlEffectExecutor = sqlEffectExecutor; this.callable = callable; + this.queue = queue; } public void ping() { @@ -52,19 +48,25 @@ class QueueThread implements Runnable { public void run() { while (shouldRun) { try { - List tasks = transactionTemplate.execute(new TransactionCallback>() { - public List doInTransaction(TransactionStatus status) { - return taskDao.findByNameAndCompletedIsNull(queue.name); +// List tasks = transactionTemplate.execute(new TransactionCallback>() { +// public List doInTransaction(TransactionStatus status) { +// return taskDao.findByNameAndCompletedIsNull(queue.name); +// } +// }); + List tasks = sqlEffectExecutor.execute(new SqlEffect>() { + @Override + public List doInConnection(Connection connection) throws SQLException { + return new TaskDao(connection).findByNameAndCompletedIsNull(queue.name); } }); log.info("Found {} tasks on queue {}", tasks.size(), queue.name); - if(tasks.size() > 0) { + if (tasks.size() > 0) { for (final Task task : tasks) { try { executeTask(task); - } catch (TransactionException | TaskFailureException e) { + } catch (SqlExecutionException | TaskFailureException e) { log.warn("Task execution failed", e); } } @@ -96,24 +98,44 @@ class QueueThread implements Runnable { private void executeTask(final Task task) { final Date run = new Date(); log.info("Setting last run on task. date = {}, task = {}", run, task); - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - protected void doInTransactionWithoutResult(TransactionStatus status) { - taskDao.update(task.registerRun()); + sqlEffectExecutor.execute(new SqlEffect.Void() { + @Override + public void doInConnection(Connection connection) throws SQLException { + new TaskDao(connection).update(task.registerRun()); } }); - - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - protected void doInTransactionWithoutResult(TransactionStatus status) { +// transactionTemplate.execute(new TransactionCallbackWithoutResult() { +// protected void doInTransactionWithoutResult(TransactionStatus status) { +// taskDao.update(task.registerRun()); +// } +// }); + + sqlEffectExecutor.execute(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { try { callable.run(task.arguments); Date completed = new Date(); Task t = task.registerComplete(completed); log.info("Completed task: {}", t); - taskDao.update(t); + new TaskDao(c).update(t); } catch (Exception e) { throw new TaskFailureException(e); } } }); +// transactionTemplate.execute(new TransactionCallbackWithoutResult() { +// protected void doInTransactionWithoutResult(TransactionStatus status) { +// try { +// callable.run(task.arguments); +// Date completed = new Date(); +// Task t = task.registerComplete(completed); +// log.info("Completed task: {}", t); +// taskDao.update(t); +// } catch (Exception e) { +// throw new TaskFailureException(e); +// } +// } +// }); } } diff --git a/src/main/java/io/trygvis/async/SqlEffect.java b/src/main/java/io/trygvis/async/SqlEffect.java new file mode 100644 index 0000000..d0c4e9b --- /dev/null +++ b/src/main/java/io/trygvis/async/SqlEffect.java @@ -0,0 +1,12 @@ +package io.trygvis.async; + +import java.sql.Connection; +import java.sql.SQLException; + +public interface SqlEffect { + A doInConnection(Connection c) throws SQLException; + + interface Void { + void doInConnection(Connection c) throws SQLException; + } +} diff --git a/src/main/java/io/trygvis/async/SqlEffectExecutor.java b/src/main/java/io/trygvis/async/SqlEffectExecutor.java new file mode 100644 index 0000000..c8abbd3 --- /dev/null +++ b/src/main/java/io/trygvis/async/SqlEffectExecutor.java @@ -0,0 +1,39 @@ +package io.trygvis.async; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; + +public class SqlEffectExecutor { + + private final DataSource dataSource; + + public SqlEffectExecutor(DataSource dataSource) { + this.dataSource = dataSource; + } + + public A execute(SqlEffect effect) { + try (Connection c = dataSource.getConnection()) { + return effect.doInConnection(c); + } catch (SQLException e) { + throw new SqlExecutionException(e); + } + } + + public void execute(SqlEffect.Void effect) { + try (Connection c = dataSource.getConnection()) { + effect.doInConnection(c); + } catch (SQLException e) { + throw new SqlExecutionException(e); + } + } + + public static class SqlExecutionException extends RuntimeException { + public final SQLException exception; + + public SqlExecutionException(SQLException ex) { + super(ex); + this.exception = ex; + } + } +} diff --git a/src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java b/src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java new file mode 100644 index 0000000..8517c68 --- /dev/null +++ b/src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java @@ -0,0 +1,102 @@ +package io.trygvis.async.spring; + +import io.trygvis.async.AsyncService; +import io.trygvis.async.JdbcAsyncService; +import io.trygvis.async.SqlEffectExecutor; +import io.trygvis.queue.Queue; +import io.trygvis.queue.Task; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.core.ConnectionCallback; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionSynchronization; +import org.springframework.transaction.support.TransactionSynchronizationAdapter; +import org.springframework.transaction.support.TransactionTemplate; + +import javax.annotation.PostConstruct; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static org.springframework.transaction.annotation.Propagation.REQUIRED; +import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization; + +public class SpringJdbcAsyncService implements AsyncService { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); + + private final TransactionTemplate transactionTemplate; + + private final JdbcTemplate jdbcTemplate; + + private SqlEffectExecutor sqlEffectExecutor; + + final JdbcAsyncService jdbcAsyncService; + + public SpringJdbcAsyncService(TransactionTemplate transactionTemplate, JdbcTemplate jdbcTemplate) { + this.transactionTemplate = transactionTemplate; + this.jdbcTemplate = jdbcTemplate; + jdbcAsyncService = new JdbcAsyncService(); + sqlEffectExecutor = new SqlEffectExecutor(this.jdbcTemplate.getDataSource()); + } + + @Transactional(propagation = REQUIRED) + public Queue registerQueue(final String name, final int interval, final AsyncService.AsyncCallable callable) { + return jdbcTemplate.execute(new ConnectionCallback() { + @Override + public Queue doInConnection(Connection c) throws SQLException { + + Queue q = jdbcAsyncService.registerQueue(c, sqlEffectExecutor, name, interval, callable); + + registerSynchronization(new TransactionSynchronizationAdapter() { + public void afterCompletion(int status) { + log.info("Transaction completed with status = {}", status); + if (status == TransactionSynchronization.STATUS_COMMITTED) { + jdbcAsyncService.startQueue(executor, name); + } + } + }); + + log.info("registerQueue: LEAVE"); + return q; + } + }); + } + + public Queue getQueue(String name) { + return jdbcAsyncService.getQueue(name); + } + + @Transactional(propagation = REQUIRED) + public Task schedule(final Queue queue, final String... args) { + return jdbcTemplate.execute(new ConnectionCallback() { + @Override + public Task doInConnection(Connection c) throws SQLException { + return jdbcAsyncService.schedule(c, queue, args); + } + }); + } + + public Task schedule(final long parent, final Queue queue, final String... args) { + return jdbcTemplate.execute(new ConnectionCallback() { + @Override + public Task doInConnection(Connection c) throws SQLException { + return jdbcAsyncService.schedule(c, parent, queue, args); + } + }); + } + + @Transactional(readOnly = true) + public Task update(final Task ref) { + return jdbcTemplate.execute(new ConnectionCallback() { + @Override + public Task doInConnection(Connection c) throws SQLException { + return jdbcAsyncService.update(c, ref); + } + }); + } +} diff --git a/src/main/java/io/trygvis/model/Article.java b/src/main/java/io/trygvis/model/Article.java deleted file mode 100755 index e86c570..0000000 --- a/src/main/java/io/trygvis/model/Article.java +++ /dev/null @@ -1,55 +0,0 @@ -package io.trygvis.model; - -import javax.persistence.Entity; -import javax.persistence.GeneratedValue; -import javax.persistence.GenerationType; -import javax.persistence.Id; -import javax.persistence.SequenceGenerator; -import java.util.Date; - -@Entity -public class Article { - @Id - @SequenceGenerator(name="id_seq", sequenceName="id_seq") - @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "id_seq") - private Integer id; - private Date created; - private Date updated; - private String title; - private String body; - - @SuppressWarnings("UnusedDeclaration") - private Article() { - } - - public Article(Date created, Date updated, String title, String body) { - this.created = created; - this.updated = updated; - this.title = title; - this.body = body; - } - - public Integer getId() { - return id; - } - - public Date getCreated() { - return created; - } - - public Date getUpdated() { - return updated; - } - - public void setUpdated(Date updated) { - this.updated = updated; - } - - public String getTitle() { - return title; - } - - public String getBody() { - return body; - } -} diff --git a/src/main/java/io/trygvis/queue/QueueDao.java b/src/main/java/io/trygvis/queue/QueueDao.java index 63dde2a..2f69e11 100644 --- a/src/main/java/io/trygvis/queue/QueueDao.java +++ b/src/main/java/io/trygvis/queue/QueueDao.java @@ -1,36 +1,45 @@ package io.trygvis.queue; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.jdbc.core.RowMapper; -import org.springframework.stereotype.Component; - +import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import static org.springframework.dao.support.DataAccessUtils.singleResult; - -@Component public class QueueDao { - @Autowired - private JdbcTemplate jdbcTemplate; + private final Connection connection; - public Queue findByName(String name) { - return singleResult(jdbcTemplate.query("SELECT name, interval FROM queue WHERE name=?", new QueueRowMapper(), name)); + public QueueDao(Connection connection) { + this.connection = connection; } - public void insert(Queue q) { - jdbcTemplate.update("INSERT INTO queue(name, interval) VALUES(?, ?)", q.name, q.interval); + public Queue findByName(String name) throws SQLException { + try (PreparedStatement stmt = connection.prepareStatement("SELECT name, interval FROM queue WHERE name=?")) { + stmt.setString(1, name); + ResultSet rs = stmt.executeQuery(); + return rs.next() ? mapRow(rs) : null; + } } - public void update(Queue q) { - jdbcTemplate.update("UPDATE queue SET interval=? WHERE name=?", q.interval, q.name); + public void insert(Queue q) throws SQLException { + try (PreparedStatement stmt = connection.prepareStatement("INSERT INTO queue(name, interval) VALUES(?, ?)")) { + int i = 1; + stmt.setString(i++, q.name); + stmt.setLong(i, q.interval); + stmt.executeUpdate(); + } } - private class QueueRowMapper implements RowMapper { - public Queue mapRow(ResultSet rs, int rowNum) throws SQLException { - return new Queue(rs.getString(1), rs.getLong(2)); + public void update(Queue q) throws SQLException { + try (PreparedStatement stmt = connection.prepareStatement("UPDATE queue SET interval=? WHERE name=?")) { + int i = 1; + stmt.setLong(i++, q.interval); + stmt.setString(i, q.name); + stmt.executeUpdate(); } } + + public Queue mapRow(ResultSet rs) throws SQLException { + return new Queue(rs.getString(1), rs.getLong(2)); + } } diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java index a59dcbb..5459933 100644 --- a/src/main/java/io/trygvis/queue/TaskDao.java +++ b/src/main/java/io/trygvis/queue/TaskDao.java @@ -1,69 +1,104 @@ package io.trygvis.queue; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.jdbc.core.RowMapper; -import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Transactional; - +import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.List; import static java.util.Arrays.asList; -import static org.springframework.transaction.annotation.Propagation.MANDATORY; -@Component public class TaskDao { - @Autowired - private JdbcTemplate jdbcTemplate; + private final Connection connection; + + public static final String fields = "id, parent, queue, scheduled, last_run, run_count, completed, arguments"; - public long insert(String queue, Date scheduled, String arguments) { - return this.insert(null, queue, scheduled, arguments); + public TaskDao(Connection connection) { + this.connection = connection; } - @Transactional(propagation = MANDATORY) - public long insert(Long parent, String queue, Date scheduled, String arguments) { - jdbcTemplate.update("INSERT INTO task(id, parent, run_count, queue, scheduled, arguments) " + - "VALUES(nextval('task_seq'), ?, 0, ?, ?, ?)", parent, queue, scheduled, arguments); - return jdbcTemplate.queryForObject("SELECT currval('task_seq')", Long.class); + public long insert(String queue, Date scheduled, String arguments) throws SQLException { + return insert(null, queue, scheduled, arguments); } - @Transactional(propagation = MANDATORY) - public Task findById(long id) { - return jdbcTemplate.queryForObject("SELECT " + TaskRowMapper.fields + " FROM task WHERE id=?", - new TaskRowMapper(), id); + public long insert(Long parent, String queue, Date scheduled, String arguments) throws SQLException { + String sql = "INSERT INTO task(id, parent, run_count, queue, scheduled, arguments) " + + "VALUES(nextval('task_seq'), ?, 0, ?, ?, ?)"; + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + int i = 1; + if (parent == null) { + stmt.setNull(i++, Types.BIGINT); + } else { + stmt.setLong(i++, parent); + } + stmt.setString(i++, queue); + stmt.setTimestamp(i++, new Timestamp(scheduled.getTime())); + stmt.setString(i, arguments); + stmt.executeUpdate(); + } + try (PreparedStatement stmt = connection.prepareStatement("SELECT currval('task_seq')")) { + ResultSet rs = stmt.executeQuery(); + rs.next(); + return rs.getLong(1); + } } - @Transactional(propagation = MANDATORY) - public List findByNameAndCompletedIsNull(String name) { - return jdbcTemplate.query("SELECT " + TaskRowMapper.fields + " FROM task WHERE queue=? AND completed IS NULL", - new TaskRowMapper(), name); + public Task findById(long id) throws SQLException { + try (PreparedStatement stmt = connection.prepareStatement("SELECT " + fields + " FROM task WHERE id=?")) { + ResultSet rs = stmt.executeQuery(); + return rs.next() ? mapRow(rs) : null; + } } - @Transactional(propagation = MANDATORY) - public void update(Task task) { - jdbcTemplate.update("UPDATE task SET scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?", - task.scheduled, task.lastRun, task.runCount, task.completed, task.id); + public List findByNameAndCompletedIsNull(String name) throws SQLException { + try (PreparedStatement stmt = connection.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND completed IS NULL")) { + int i = 1; + stmt.setString(i, name); + ResultSet rs = stmt.executeQuery(); + List list = new ArrayList<>(); + while (rs.next()) { + list.add(mapRow(rs)); + } + return list; + } } - private class TaskRowMapper implements RowMapper { - public static final String fields = "id, parent, queue, scheduled, last_run, run_count, completed, arguments"; + public void update(Task task) throws SQLException { + try (PreparedStatement stmt = connection.prepareStatement("UPDATE task SET scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?")) { + int i = 1; + stmt.setTimestamp(i++, new Timestamp(task.scheduled.getTime())); + setTimestamp(stmt, i++, task.lastRun); + stmt.setInt(i++, task.runCount); + setTimestamp(stmt, i++, task.completed); + stmt.setLong(i, task.id); + stmt.executeUpdate(); + } + } - public Task mapRow(ResultSet rs, int rowNum) throws SQLException { - String arguments = rs.getString(8); - return new Task( - rs.getLong(1), - rs.getLong(2), - rs.getString(3), - rs.getTimestamp(4), - rs.getTimestamp(5), - rs.getInt(6), - rs.getTimestamp(7), - arguments != null ? asList(arguments.split(" ")) : Collections.emptyList()); + private static void setTimestamp(PreparedStatement stmt, int parameterIndex, Date date) throws SQLException { + if (date == null) { + stmt.setNull(parameterIndex, Types.TIMESTAMP); + } else { + stmt.setTimestamp(parameterIndex, new Timestamp(date.getTime())); } } + + public Task mapRow(ResultSet rs) throws SQLException { + String arguments = rs.getString(8); + return new Task( + rs.getLong(1), + rs.getLong(2), + rs.getString(3), + rs.getTimestamp(4), + rs.getTimestamp(5), + rs.getInt(6), + rs.getTimestamp(7), + arguments != null ? asList(arguments.split(" ")) : Collections.emptyList()); + } } diff --git a/src/main/java/io/trygvis/spring/Config.java b/src/main/java/io/trygvis/spring/Config.java deleted file mode 100755 index df4b2e2..0000000 --- a/src/main/java/io/trygvis/spring/Config.java +++ /dev/null @@ -1,172 +0,0 @@ -package io.trygvis.spring; - -import com.jolbox.bonecp.BoneCPDataSource; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.ComponentScan; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.support.PropertySourcesPlaceholderConfigurer; -import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.jdbc.datasource.DataSourceTransactionManager; -import org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy; -import org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy; -import org.springframework.transaction.PlatformTransactionManager; -import org.springframework.transaction.annotation.EnableTransactionManagement; -import org.springframework.transaction.support.TransactionTemplate; - -import javax.sql.DataSource; - -@Configuration -@ComponentScan(basePackages = "io.trygvis") -@EnableTransactionManagement -//@EnableJpaRepositories(basePackages = "io.trygvis.data") -public class Config { - - @Bean - public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() throws Exception { - return new PropertySourcesPlaceholderConfigurer() {{ -// setLocation(new UrlResource("file:environment.properties")); - setProperties(System.getProperties()); - setLocalOverride(true); - }}; - } - - @Bean - public JdbcTemplate jdbcTemplate(DataSource dataSource) { - return new JdbcTemplate(dataSource); - } - -// public SpringBeanJobFactory springBeanJobFactory() { -// SpringBeanJobFactory factory = new SpringBeanJobFactory(); -// return factory; -// } - -/* - @Bean - public SchedulerFactoryBean quartz(DataSource dataSource, PlatformTransactionManager transactionManager) { - SchedulerFactoryBean bean = new SchedulerFactoryBean(); - bean.setApplicationContextSchedulerContextKey("applicationContext"); - bean.setDataSource(dataSource); - bean.setTransactionManager(transactionManager); -// bean.setJobFactory(new JobFactory() { -// public Job newJob(TriggerFiredBundle bundle, Scheduler scheduler) throws SchedulerException { -// Class klass = bundle.getJobDetail().getJobClass(); -// } -// }); - - Properties quartzProperties = new Properties(); - quartzProperties.setProperty("org.quartz.scheduler.skipUpdateCheck", "true"); -// quartzProperties.setProperty("org.quartz.jobStore.selectWithLockSQL", "false"); - quartzProperties.setProperty("org.quartz.jobStore.driverDelegateClass", PostgreSQLDelegate.class.getName()); - quartzProperties.setProperty("org.quartz.scheduler.jmx.export", "true"); - bean.setQuartzProperties(quartzProperties); - return bean; - } -*/ - - // This turns out to be fairly useless as Spring won't register them automatically. - // It's probably better to use @Scheduled/@Async instead - /* - @Bean(name = "my-job") - public JobDetailFactoryBean myJobDetailBean() { - JobDetailFactoryBean bean = new JobDetailFactoryBean(); - bean.setJobClass(MyJob.class); - bean.setDurability(true); - - return bean; - } - - @Bean - public CronTriggerFactoryBean myJobTrigger(JobDetail jobDetail) { - CronTriggerFactoryBean bean = new CronTriggerFactoryBean(); - bean.setName("my-trigger"); - bean.setBeanName("my-job"); - bean.setJobDetail(jobDetail); - bean.setCronExpression("0/10 * * * * ?"); - - return bean; - } - */ - - @Bean - public DataSource dataSource(@Value("${database.url}") String jdbcUrl, - @Value("${database.username}") String username, - @Value("${database.password}") String password) { - BoneCPDataSource ds = new BoneCPDataSource(); - - ds.setLogStatementsEnabled(true); - - ds.setJdbcUrl(jdbcUrl); - ds.setUsername(username); - ds.setPassword(password); - - ds.setIdleConnectionTestPeriodInSeconds(60); - ds.setIdleMaxAgeInSeconds(240); - ds.setMaxConnectionsPerPartition(40); - ds.setMinConnectionsPerPartition(0); - ds.setPartitionCount(1); - ds.setAcquireIncrement(1); - ds.setStatementsCacheSize(1000); - ds.setReleaseHelperThreads(3); - ds.setStatisticsEnabled(true); - return new TransactionAwareDataSourceProxy(new LazyConnectionDataSourceProxy(ds)); - } - -/* - @Bean - public LocalContainerEntityManagerFactoryBean entityManagerFactory(DataSource dataSource, - @Value("${hibernate.hbm2ddl.auto}") String hbm2ddl, - @Value("${hibernate.showSql:false}") boolean showSql, - @Value("${hibernate.dialect}") String dialect) { - LocalContainerEntityManagerFactoryBean x = new LocalContainerEntityManagerFactoryBean(); - x.setDataSource(dataSource); - x.setJpaPropertyMap(createJpaMap(hbm2ddl, showSql, dialect)); - x.setPackagesToScan(Article.class.getPackage().getName()); - HibernatePersistence persistenceProvider = new HibernatePersistence(); - x.setPersistenceProvider(persistenceProvider); - return x; - } - - public static Map createJpaMap(String hbm2ddl, boolean showSql, String dialect) { - Map map = new HashMap<>(); - map.put(HBM2DDL_AUTO, hbm2ddl); - map.put(FORMAT_SQL, showSql); - map.put(SHOW_SQL, showSql); - map.put(USE_SQL_COMMENTS, showSql); - map.put(GENERATE_STATISTICS, true); - map.put(NAMING_STRATEGY, ImprovedNamingStrategy.class.getName()); - - map.put(DEFAULT_CACHE_CONCURRENCY_STRATEGY, CacheConcurrencyStrategy.READ_WRITE.toString()); - map.put(CURRENT_SESSION_CONTEXT_CLASS, SpringSessionContext.class.getName()); -// map.put(CACHE_REGION_FACTORY, EhCacheRegionFactory.class.getName()); - map.put(USE_SECOND_LEVEL_CACHE, false); - map.put(USE_QUERY_CACHE, false); -// map.put(SHARED_CACHE_MODE, SharedCacheMode.ENABLE_SELECTIVE.toString()); - - map.put(DIALECT, dialect); - map.put("hibernate.temp.use_jdbc_metadata_defaults", "false"); - - return map; - } - - @Bean - public SessionFactory sessionFactory(LocalContainerEntityManagerFactoryBean entityManagerFactory) { - return ((HibernateEntityManagerFactory) entityManagerFactory.nativeEntityManagerFactory).getSessionFactory(); - } - - @Bean - public JpaTransactionManager transactionManager(EntityManagerFactory entityManagerFactory) { - return new JpaTransactionManager(entityManagerFactory); - } -*/ - - @Bean - public PlatformTransactionManager transactionManager(DataSource dataSource) { - return new DataSourceTransactionManager(dataSource); - } - - @Bean - public TransactionTemplate transactionTemplate(PlatformTransactionManager platformTransactionManager) { - return new TransactionTemplate(platformTransactionManager); - } -} diff --git a/src/main/java/io/trygvis/spring/DefaultConfig.java b/src/main/java/io/trygvis/spring/DefaultConfig.java new file mode 100644 index 0000000..af8f644 --- /dev/null +++ b/src/main/java/io/trygvis/spring/DefaultConfig.java @@ -0,0 +1,17 @@ +package io.trygvis.spring; + +import io.trygvis.async.AsyncService; +import io.trygvis.async.spring.SpringJdbcAsyncService; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.transaction.support.TransactionTemplate; + +@Configuration +public class DefaultConfig { + + @Bean + public AsyncService asyncService(TransactionTemplate transactionTemplate, JdbcTemplate jdbcTemplate) { + return new SpringJdbcAsyncService(transactionTemplate, jdbcTemplate); + } +} diff --git a/src/main/resources/applicationContext.xml b/src/main/resources/applicationContext.xml deleted file mode 100755 index 2d33d8c..0000000 --- a/src/main/resources/applicationContext.xml +++ /dev/null @@ -1,22 +0,0 @@ - - - - - - diff --git a/src/main/resources/create-postgresql.sql b/src/main/resources/create-postgresql.sql new file mode 100644 index 0000000..39672f0 --- /dev/null +++ b/src/main/resources/create-postgresql.sql @@ -0,0 +1,29 @@ +BEGIN; + +DROP TABLE IF EXISTS task; +DROP TABLE IF EXISTS queue; +DROP SEQUENCE IF EXISTS task_seq; + +CREATE TABLE queue ( + name VARCHAR(100) NOT NULL, + interval INTEGER NOT NULL, + CONSTRAINT pk_queue PRIMARY KEY (name) +); + +CREATE TABLE task ( + id BIGINT NOT NULL, + parent BIGINT, + queue VARCHAR(100) NOT NULL, + scheduled TIMESTAMP NOT NULL, + last_run TIMESTAMP, + run_count INT NOT NULL, + completed TIMESTAMP, + arguments VARCHAR(100), + CONSTRAINT pk_task PRIMARY KEY (id), + CONSTRAINT fk_task__queue FOREIGN KEY (queue) REFERENCES queue (name), + CONSTRAINT fk_task__parent FOREIGN KEY (parent) REFERENCES task (id) +); + +CREATE SEQUENCE task_seq; + +COMMIT; diff --git a/src/main/resources/create.sql b/src/main/resources/create.sql deleted file mode 100644 index f7f2939..0000000 --- a/src/main/resources/create.sql +++ /dev/null @@ -1,29 +0,0 @@ -BEGIN; - -DROP TABLE IF EXISTS task; -DROP TABLE IF EXISTS queue; -DROP SEQUENCE IF EXISTS task_id; - -CREATE TABLE queue ( - name VARCHAR(100) NOT NULL, - interval INTEGER NOT NULL, - CONSTRAINT pk_queue PRIMARY KEY (name) -); - -CREATE TABLE task ( - id BIGINT NOT NULL, - parent BIGINT NOT NULL, - queue VARCHAR(100) NOT NULL, - scheduled TIMESTAMP NOT NULL, - last_run TIMESTAMP, - run_count INT NOT NULL, - completed TIMESTAMP, - arguments VARCHAR(100), - CONSTRAINT pk_task PRIMARY KEY (id), - CONSTRAINT fk_task__queue FOREIGN KEY (queue) REFERENCES queue (name), - CONSTRAINT fk_task__parent FOREIGN KEY (parent) REFERENCES task (id) -); - -CREATE SEQUENCE task_id; - -COMMIT; diff --git a/src/test/java/io/trygvis/test/Article.java b/src/test/java/io/trygvis/test/Article.java new file mode 100755 index 0000000..d4f54ce --- /dev/null +++ b/src/test/java/io/trygvis/test/Article.java @@ -0,0 +1,55 @@ +package io.trygvis.test; + +import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Id; +import javax.persistence.SequenceGenerator; +import java.util.Date; + +@Entity +public class Article { + @Id + @SequenceGenerator(name="id_seq", sequenceName="id_seq") + @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "id_seq") + private Integer id; + private Date created; + private Date updated; + private String title; + private String body; + + @SuppressWarnings("UnusedDeclaration") + private Article() { + } + + public Article(Date created, Date updated, String title, String body) { + this.created = created; + this.updated = updated; + this.title = title; + this.body = body; + } + + public Integer getId() { + return id; + } + + public Date getCreated() { + return created; + } + + public Date getUpdated() { + return updated; + } + + public void setUpdated(Date updated) { + this.updated = updated; + } + + public String getTitle() { + return title; + } + + public String getBody() { + return body; + } +} diff --git a/src/test/java/io/trygvis/test/CreateArticleCallable.java b/src/test/java/io/trygvis/test/CreateArticleCallable.java new file mode 100755 index 0000000..f68cd5b --- /dev/null +++ b/src/test/java/io/trygvis/test/CreateArticleCallable.java @@ -0,0 +1,41 @@ +package io.trygvis.test; + +import io.trygvis.async.AsyncService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.util.Date; +import java.util.List; +import java.util.Random; + +import static org.springframework.transaction.annotation.Propagation.MANDATORY; + +@Component("createArticle") +@Transactional(propagation = MANDATORY) +public class CreateArticleCallable implements AsyncService.AsyncCallable { + private final Logger log = LoggerFactory.getLogger(getClass()); + +// @PersistenceContext +// private EntityManager entityManager; + + private Random random = new Random(); + + public void run(List arguments) throws Exception { + log.info("CreateArticeJob.run: BEGIN"); + + if (random.nextInt() % 3 == 0) { + throw new RuntimeException("failing create article"); + } + + Date now = new Date(); + + log.info("now = {}", now); + + Article article = new Article(new Date(), null, "title", "body"); +// entityManager.persist(article); + + log.info("CreateArticeJob.run: END"); + } +} diff --git a/src/test/java/io/trygvis/test/Main.java b/src/test/java/io/trygvis/test/Main.java new file mode 100755 index 0000000..721df61 --- /dev/null +++ b/src/test/java/io/trygvis/test/Main.java @@ -0,0 +1,115 @@ +package io.trygvis.test; + +import io.trygvis.async.AsyncService; +import io.trygvis.queue.Queue; +import io.trygvis.queue.Task; +import org.hibernate.dialect.PostgreSQL82Dialect; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.bridge.SLF4JBridgeHandler; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.support.ClassPathXmlApplicationContext; +import org.springframework.stereotype.Component; +import org.springframework.transaction.support.TransactionTemplate; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import static java.lang.System.*; +import static java.lang.Thread.sleep; + +@Component +public class Main { + private static final Logger log = LoggerFactory.getLogger(Main.class); + + public static void main(String[] args) throws Exception { + SLF4JBridgeHandler.install(); + + String username = getProperty("user.name"); + setProperty("database.url", getProperty("jdbc.url", "jdbc:postgresql://localhost/" + username)); + setProperty("database.username", username); + setProperty("database.password", username); +// setProperty("hibernate.showSql", "true"); + setProperty("hibernate.hbm2ddl.auto", "create"); // create + setProperty("hibernate.dialect", PostgreSQL82Dialect.class.getName()); + + log.info("Starting context"); + ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml"); + log.info("Started context"); + + try { + context.getBean(Main.class).run(); +// log.info("Sleeping"); +// sleep(1000 * 1000); + } catch (Exception e) { + e.printStackTrace(System.out); + } + + log.info("Stopping context"); + context.stop(); + log.info("Stopped context"); + + exit(0); + } + + @Autowired + private TransactionTemplate transactionTemplate; + + @Autowired + private AsyncService asyncService; + + @Autowired + @Qualifier("createArticle") + private AsyncService.AsyncCallable createArticleCallable; + + @Autowired + @Qualifier("updateArticle") + private AsyncService.AsyncCallable updateArticleCallable; + + public void run() throws Exception { + log.info("Main.run"); + + final Queue q = asyncService.registerQueue("create-article", 1, createArticleCallable); +// log.info("queue registered: ref = {}", q); +// asyncService.registerQueue("update-queue", 1, updateArticleCallable); + +// q = asyncService.getQueue("create-queue"); + + final List tasks = new ArrayList<>(); + + final int count = 1; + log.info("Creating {} tasks", count); +// transactionTemplate.execute(new TransactionCallbackWithoutResult() { +// protected void doInTransactionWithoutResult(TransactionStatus status) { +// for (int i = 0; i < count; i++) { +// tasks.add(asyncService.schedule(q)); +// } +// } +// }); + log.info("Created {} tasks", count); + + while (true) { + sleep(10000); + + log.info("Checking for status of {} tasks", tasks.size()); + for (Iterator iterator = tasks.iterator(); iterator.hasNext(); ) { + Task task = iterator.next(); + + task = asyncService.update(task); + +// log.info("task = {}", task); + + if (task.isDone()) { + iterator.remove(); + } + } + + if (tasks.isEmpty()) { + log.info("No more tasks"); + break; + } + } + } +} diff --git a/src/test/java/io/trygvis/test/UpdateArticleCallable.java b/src/test/java/io/trygvis/test/UpdateArticleCallable.java new file mode 100755 index 0000000..aae28b9 --- /dev/null +++ b/src/test/java/io/trygvis/test/UpdateArticleCallable.java @@ -0,0 +1,44 @@ +package io.trygvis.test; + +import io.trygvis.async.AsyncService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.util.Date; +import java.util.List; +import java.util.Random; + +@Component("updateArticle") +public class UpdateArticleCallable + implements AsyncService.AsyncCallable { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final Random r = new Random(); + +// @PersistenceContext +// private EntityManager entityManager; + +// @Transactional(propagation = REQUIRES_NEW) + public void run(List arguments) throws Exception { + log.info("UpdateArticeJob.run: BEGIN"); + + Date now = new Date(); + + log.info("now = {}", now); + +/* + TypedQuery
q = entityManager.createQuery(entityManager.getCriteriaBuilder().createQuery(Article.class)); + + List
list = q.getResultList(); + log.info("Got {} articles", list.size()); + + Article a = list.get(r.nextInt(list.size())); + a.setUpdated(new Date()); + + entityManager.persist(a); +*/ + + log.info("UpdateArticeJob.run: END"); + } +} diff --git a/src/test/java/io/trygvis/test/spring/PlainSpringTest.java b/src/test/java/io/trygvis/test/spring/PlainSpringTest.java new file mode 100644 index 0000000..9a7a436 --- /dev/null +++ b/src/test/java/io/trygvis/test/spring/PlainSpringTest.java @@ -0,0 +1,59 @@ +package io.trygvis.test.spring; + +import io.trygvis.async.AsyncService; +import io.trygvis.queue.Queue; +import io.trygvis.spring.DefaultConfig; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import java.sql.SQLException; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import static java.lang.System.getProperty; +import static java.lang.System.setProperty; +import static org.fest.assertions.Assertions.assertThat; +import static org.junit.Assert.assertNotNull; + +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(classes = {TestConfig.class, DefaultConfig.class}) +public class PlainSpringTest { + + @Autowired + private AsyncService asyncService; + + static { + String username = getProperty("user.name"); + setProperty("database.url", getProperty("jdbc.url", "jdbc:postgresql://localhost/" + username)); + setProperty("database.username", username); + setProperty("database.password", username); + } + + @Test + public void testBasic() throws SQLException, InterruptedException { + final AtomicReference> ref = new AtomicReference<>(); + Queue test = asyncService.registerQueue("test", 10, new AsyncService.AsyncCallable() { + public void run(List arguments) throws Exception { + System.out.println("PlainSpringTest.run"); + ref.set(arguments); + synchronized (ref) { + ref.notify(); + } + } + }); + + synchronized (ref) { + System.out.println("Scheduling task"); + asyncService.schedule(test, "hello", "world"); + System.out.println("Waiting"); + ref.wait(1000); + } + + List args = ref.get(); + assertNotNull(args); + assertThat(args).containsExactly("hello", "world"); + } +} diff --git a/src/test/java/io/trygvis/test/spring/TestConfig.java b/src/test/java/io/trygvis/test/spring/TestConfig.java new file mode 100755 index 0000000..7853cb5 --- /dev/null +++ b/src/test/java/io/trygvis/test/spring/TestConfig.java @@ -0,0 +1,172 @@ +package io.trygvis.test.spring; + +import com.jolbox.bonecp.BoneCPDataSource; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.support.PropertySourcesPlaceholderConfigurer; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy; +import org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.annotation.EnableTransactionManagement; +import org.springframework.transaction.support.TransactionTemplate; + +import javax.sql.DataSource; + +@Configuration +@ComponentScan(basePackages = "io.trygvis") +@EnableTransactionManagement +//@EnableJpaRepositories(basePackages = "io.trygvis.data") +public class TestConfig { + + @Bean + public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() throws Exception { + return new PropertySourcesPlaceholderConfigurer() {{ +// setLocation(new UrlResource("file:environment.properties")); + setProperties(System.getProperties()); + setLocalOverride(true); + }}; + } + + @Bean + public JdbcTemplate jdbcTemplate(DataSource dataSource) { + return new JdbcTemplate(dataSource); + } + +// public SpringBeanJobFactory springBeanJobFactory() { +// SpringBeanJobFactory factory = new SpringBeanJobFactory(); +// return factory; +// } + +/* + @Bean + public SchedulerFactoryBean quartz(DataSource dataSource, PlatformTransactionManager transactionManager) { + SchedulerFactoryBean bean = new SchedulerFactoryBean(); + bean.setApplicationContextSchedulerContextKey("applicationContext"); + bean.setDataSource(dataSource); + bean.setTransactionManager(transactionManager); +// bean.setJobFactory(new JobFactory() { +// public Job newJob(TriggerFiredBundle bundle, Scheduler scheduler) throws SchedulerException { +// Class klass = bundle.getJobDetail().getJobClass(); +// } +// }); + + Properties quartzProperties = new Properties(); + quartzProperties.setProperty("org.quartz.scheduler.skipUpdateCheck", "true"); +// quartzProperties.setProperty("org.quartz.jobStore.selectWithLockSQL", "false"); + quartzProperties.setProperty("org.quartz.jobStore.driverDelegateClass", PostgreSQLDelegate.class.getName()); + quartzProperties.setProperty("org.quartz.scheduler.jmx.export", "true"); + bean.setQuartzProperties(quartzProperties); + return bean; + } +*/ + + // This turns out to be fairly useless as Spring won't register them automatically. + // It's probably better to use @Scheduled/@Async instead + /* + @Bean(name = "my-job") + public JobDetailFactoryBean myJobDetailBean() { + JobDetailFactoryBean bean = new JobDetailFactoryBean(); + bean.setJobClass(MyJob.class); + bean.setDurability(true); + + return bean; + } + + @Bean + public CronTriggerFactoryBean myJobTrigger(JobDetail jobDetail) { + CronTriggerFactoryBean bean = new CronTriggerFactoryBean(); + bean.setName("my-trigger"); + bean.setBeanName("my-job"); + bean.setJobDetail(jobDetail); + bean.setCronExpression("0/10 * * * * ?"); + + return bean; + } + */ + + @Bean + public DataSource dataSource(@Value("${database.url}") String jdbcUrl, + @Value("${database.username}") String username, + @Value("${database.password}") String password) { + BoneCPDataSource ds = new BoneCPDataSource(); + + ds.setLogStatementsEnabled(true); + + ds.setJdbcUrl(jdbcUrl); + ds.setUsername(username); + ds.setPassword(password); + + ds.setIdleConnectionTestPeriodInSeconds(60); + ds.setIdleMaxAgeInSeconds(240); + ds.setMaxConnectionsPerPartition(40); + ds.setMinConnectionsPerPartition(0); + ds.setPartitionCount(1); + ds.setAcquireIncrement(1); + ds.setStatementsCacheSize(1000); + ds.setReleaseHelperThreads(3); + ds.setStatisticsEnabled(true); + return new TransactionAwareDataSourceProxy(new LazyConnectionDataSourceProxy(ds)); + } + +/* + @Bean + public LocalContainerEntityManagerFactoryBean entityManagerFactory(DataSource dataSource, + @Value("${hibernate.hbm2ddl.auto}") String hbm2ddl, + @Value("${hibernate.showSql:false}") boolean showSql, + @Value("${hibernate.dialect}") String dialect) { + LocalContainerEntityManagerFactoryBean x = new LocalContainerEntityManagerFactoryBean(); + x.setDataSource(dataSource); + x.setJpaPropertyMap(createJpaMap(hbm2ddl, showSql, dialect)); + x.setPackagesToScan(Article.class.getPackage().getName()); + HibernatePersistence persistenceProvider = new HibernatePersistence(); + x.setPersistenceProvider(persistenceProvider); + return x; + } + + public static Map createJpaMap(String hbm2ddl, boolean showSql, String dialect) { + Map map = new HashMap<>(); + map.put(HBM2DDL_AUTO, hbm2ddl); + map.put(FORMAT_SQL, showSql); + map.put(SHOW_SQL, showSql); + map.put(USE_SQL_COMMENTS, showSql); + map.put(GENERATE_STATISTICS, true); + map.put(NAMING_STRATEGY, ImprovedNamingStrategy.class.getName()); + + map.put(DEFAULT_CACHE_CONCURRENCY_STRATEGY, CacheConcurrencyStrategy.READ_WRITE.toString()); + map.put(CURRENT_SESSION_CONTEXT_CLASS, SpringSessionContext.class.getName()); +// map.put(CACHE_REGION_FACTORY, EhCacheRegionFactory.class.getName()); + map.put(USE_SECOND_LEVEL_CACHE, false); + map.put(USE_QUERY_CACHE, false); +// map.put(SHARED_CACHE_MODE, SharedCacheMode.ENABLE_SELECTIVE.toString()); + + map.put(DIALECT, dialect); + map.put("hibernate.temp.use_jdbc_metadata_defaults", "false"); + + return map; + } + + @Bean + public SessionFactory sessionFactory(LocalContainerEntityManagerFactoryBean entityManagerFactory) { + return ((HibernateEntityManagerFactory) entityManagerFactory.nativeEntityManagerFactory).getSessionFactory(); + } + + @Bean + public JpaTransactionManager transactionManager(EntityManagerFactory entityManagerFactory) { + return new JpaTransactionManager(entityManagerFactory); + } +*/ + + @Bean + public PlatformTransactionManager transactionManager(DataSource dataSource) { + return new DataSourceTransactionManager(dataSource); + } + + @Bean + public TransactionTemplate transactionTemplate(PlatformTransactionManager platformTransactionManager) { + return new TransactionTemplate(platformTransactionManager); + } +} diff --git a/src/test/resources/applicationContext.xml b/src/test/resources/applicationContext.xml new file mode 100755 index 0000000..5f173b3 --- /dev/null +++ b/src/test/resources/applicationContext.xml @@ -0,0 +1,22 @@ + + + + + + -- cgit v1.2.3