diff options
Diffstat (limited to 'src/main/java/io/trygvis/async')
-rwxr-xr-x | src/main/java/io/trygvis/async/AsyncService.java | 5 | ||||
-rw-r--r-- | src/main/java/io/trygvis/async/JdbcAsyncService.java | 109 | ||||
-rw-r--r-- | src/main/java/io/trygvis/async/QueueThread.java | 78 | ||||
-rw-r--r-- | src/main/java/io/trygvis/async/SqlEffect.java | 12 | ||||
-rw-r--r-- | src/main/java/io/trygvis/async/SqlEffectExecutor.java | 39 | ||||
-rw-r--r-- | src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java | 102 |
6 files changed, 247 insertions, 98 deletions
diff --git a/src/main/java/io/trygvis/async/AsyncService.java b/src/main/java/io/trygvis/async/AsyncService.java index e90a0e4..57c1af8 100755 --- a/src/main/java/io/trygvis/async/AsyncService.java +++ b/src/main/java/io/trygvis/async/AsyncService.java @@ -2,8 +2,8 @@ package io.trygvis.async; import io.trygvis.queue.Queue; import io.trygvis.queue.Task; -import org.quartz.SchedulerException; +import java.sql.SQLException; import java.util.List; /** @@ -16,9 +16,8 @@ public interface AsyncService { * @param interval how often the queue should be polled for missed tasks in seconds. * @param callable * @return - * @throws SchedulerException */ - Queue registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException; + Queue registerQueue(final String name, final int interval, AsyncCallable callable); Queue getQueue(String name); diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java index 4e78a37..c34330e 100644 --- a/src/main/java/io/trygvis/async/JdbcAsyncService.java +++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java @@ -4,88 +4,66 @@ import io.trygvis.queue.Queue; import io.trygvis.queue.QueueDao; import io.trygvis.queue.Task; import io.trygvis.queue.TaskDao; -import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Transactional; -import org.springframework.transaction.support.TransactionSynchronization; -import org.springframework.transaction.support.TransactionSynchronizationAdapter; -import org.springframework.transaction.support.TransactionTemplate; +import java.sql.Connection; +import java.sql.SQLException; import java.util.Date; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledThreadPoolExecutor; import static java.lang.System.currentTimeMillis; import static java.lang.Thread.sleep; import static java.util.Arrays.asList; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.springframework.transaction.annotation.Propagation.REQUIRED; -import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization; -@Component -public class JdbcAsyncService implements AsyncService { +public class JdbcAsyncService { private final Logger log = LoggerFactory.getLogger(getClass()); - private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); - private final Map<String, QueueThread> queues = new HashMap<>(); - @Autowired - private TransactionTemplate transactionTemplate; - - @Autowired - private QueueDao queueDao; - - @Autowired - private TaskDao taskDao; + public Queue registerQueue(Connection c, SqlEffectExecutor sqlEffectExecutor, final String name, final int interval, AsyncService.AsyncCallable callable) throws SQLException { + QueueDao queueDao = new QueueDao(c); - @Transactional(propagation = REQUIRED) - public Queue registerQueue(final String name, final int interval, AsyncCallable callable) throws SchedulerException { log.info("registerQueue: ENTER"); Queue q = queueDao.findByName(name); log.info("q = {}", q); - final long interval_; if (q == null) { q = new Queue(name, interval); queueDao.insert(q); - interval_ = interval; - } else { - // Found an existing queue. Use the Settings from the database. - interval_ = q.interval; } - final QueueThread queueThread = new QueueThread(q, taskDao, transactionTemplate, callable); + final QueueThread queueThread = new QueueThread(sqlEffectExecutor, callable, q); queues.put(name, queueThread); - registerSynchronization(new TransactionSynchronizationAdapter() { - public void afterCompletion(int status) { - log.info("Transaction completed with status = {}", status); - if (status == TransactionSynchronization.STATUS_COMMITTED) { - log.info("Starting thread for queue {} with poll interval = {}s", name, interval); - executor.scheduleAtFixedRate(new Runnable() { - public void run() { - queueThread.ping(); - } - }, 10, interval_, SECONDS); - Thread thread = new Thread(queueThread, name); - thread.setDaemon(true); - thread.start(); - } - } - }); - log.info("registerQueue: LEAVE"); return q; } + public void startQueue(ScheduledThreadPoolExecutor executor, String name) { + final QueueThread queueThread = queues.get(name); + + if (queueThread == null) { + throw new RuntimeException("No such queue: " + name); + } + + long interval = queueThread.queue.interval; + log.info("Starting thread for queue {} with poll interval = {}s", name, interval); + executor.scheduleAtFixedRate(new Runnable() { + public void run() { + queueThread.ping(); + } + }, 10, interval, SECONDS); + Thread thread = new Thread(queueThread, name); + thread.setDaemon(true); + thread.start(); + } + public Queue getQueue(String name) { QueueThread queueThread = queues.get(name); @@ -96,16 +74,17 @@ public class JdbcAsyncService implements AsyncService { return queueThread.queue; } - @Transactional(propagation = REQUIRED) - public Task schedule(final Queue queue, String... args) { - return scheduleInner(null, queue, args); + public Task schedule(Connection c, final Queue queue, String... args) throws SQLException { + return scheduleInner(c, null, queue, args); } - public Task schedule(long parent, Queue queue, String... args) { - return scheduleInner(parent, queue, args); + public Task schedule(Connection c, long parent, Queue queue, String... args) throws SQLException { + return scheduleInner(c, parent, queue, args); } - private Task scheduleInner(Long parent, final Queue queue, String... args) { + private Task scheduleInner(Connection c, Long parent, final Queue queue, String... args) throws SQLException { + TaskDao taskDao = new TaskDao(c); + Date scheduled = new Date(); StringBuilder arguments = new StringBuilder(); @@ -114,27 +93,22 @@ public class JdbcAsyncService implements AsyncService { } long id = taskDao.insert(parent, queue.name, scheduled, arguments.toString()); - Task task = new Task(parent, id, queue.name, scheduled, null, 0, null, asList(args)); + Task task = new Task(id, parent, queue.name, scheduled, null, 0, null, asList(args)); log.info("Created task = {}", task); - registerSynchronization(new TransactionSynchronizationAdapter() { - public void afterCompletion(int status) { - if (status == TransactionSynchronization.STATUS_COMMITTED) { - queues.get(queue.name).ping(); - } - } - }); - return task; } - @Transactional - public Task await(Task task, long timeout) { + public Task await(Connection c, Task task, long timeout) throws SQLException { final long start = currentTimeMillis(); final long end = start + timeout; while (currentTimeMillis() < end) { - task = update(task); + task = update(c, task); + + if (task == null) { + throw new RuntimeException("The task went away."); + } try { sleep(100); @@ -146,8 +120,9 @@ public class JdbcAsyncService implements AsyncService { return task; } - @Transactional(readOnly = true) - public Task update(Task ref) { + public Task update(Connection c, Task ref) throws SQLException { + TaskDao taskDao = new TaskDao(c); + return taskDao.findById(ref.id); } } diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java index 69466df..00c46b4 100644 --- a/src/main/java/io/trygvis/async/QueueThread.java +++ b/src/main/java/io/trygvis/async/QueueThread.java @@ -5,37 +5,33 @@ import io.trygvis.queue.Task; import io.trygvis.queue.TaskDao; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.transaction.TransactionException; -import org.springframework.transaction.TransactionStatus; -import org.springframework.transaction.support.TransactionCallback; -import org.springframework.transaction.support.TransactionCallbackWithoutResult; -import org.springframework.transaction.support.TransactionTemplate; +import java.sql.Connection; +import java.sql.SQLException; import java.util.Date; import java.util.List; +import static io.trygvis.async.SqlEffectExecutor.SqlExecutionException; + class QueueThread implements Runnable { private final Logger log = LoggerFactory.getLogger(getClass()); - public boolean shouldRun = true; - - private boolean checkForNewTasks; + private final SqlEffectExecutor sqlEffectExecutor; - private boolean busy; + private final AsyncService.AsyncCallable callable; public final Queue queue; - private final TaskDao taskDao; + public boolean shouldRun = true; - private final TransactionTemplate transactionTemplate; + private boolean checkForNewTasks; - private final AsyncService.AsyncCallable callable; + private boolean busy; - QueueThread(Queue queue, TaskDao taskDao, TransactionTemplate transactionTemplate, AsyncService.AsyncCallable callable) { - this.queue = queue; - this.taskDao = taskDao; - this.transactionTemplate = transactionTemplate; + QueueThread(SqlEffectExecutor sqlEffectExecutor, AsyncService.AsyncCallable callable, Queue queue) { + this.sqlEffectExecutor = sqlEffectExecutor; this.callable = callable; + this.queue = queue; } public void ping() { @@ -52,19 +48,25 @@ class QueueThread implements Runnable { public void run() { while (shouldRun) { try { - List<Task> tasks = transactionTemplate.execute(new TransactionCallback<List<Task>>() { - public List<Task> doInTransaction(TransactionStatus status) { - return taskDao.findByNameAndCompletedIsNull(queue.name); +// List<Task> tasks = transactionTemplate.execute(new TransactionCallback<List<Task>>() { +// public List<Task> doInTransaction(TransactionStatus status) { +// return taskDao.findByNameAndCompletedIsNull(queue.name); +// } +// }); + List<Task> tasks = sqlEffectExecutor.execute(new SqlEffect<List<Task>>() { + @Override + public List<Task> doInConnection(Connection connection) throws SQLException { + return new TaskDao(connection).findByNameAndCompletedIsNull(queue.name); } }); log.info("Found {} tasks on queue {}", tasks.size(), queue.name); - if(tasks.size() > 0) { + if (tasks.size() > 0) { for (final Task task : tasks) { try { executeTask(task); - } catch (TransactionException | TaskFailureException e) { + } catch (SqlExecutionException | TaskFailureException e) { log.warn("Task execution failed", e); } } @@ -96,24 +98,44 @@ class QueueThread implements Runnable { private void executeTask(final Task task) { final Date run = new Date(); log.info("Setting last run on task. date = {}, task = {}", run, task); - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - protected void doInTransactionWithoutResult(TransactionStatus status) { - taskDao.update(task.registerRun()); + sqlEffectExecutor.execute(new SqlEffect.Void() { + @Override + public void doInConnection(Connection connection) throws SQLException { + new TaskDao(connection).update(task.registerRun()); } }); - - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - protected void doInTransactionWithoutResult(TransactionStatus status) { +// transactionTemplate.execute(new TransactionCallbackWithoutResult() { +// protected void doInTransactionWithoutResult(TransactionStatus status) { +// taskDao.update(task.registerRun()); +// } +// }); + + sqlEffectExecutor.execute(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { try { callable.run(task.arguments); Date completed = new Date(); Task t = task.registerComplete(completed); log.info("Completed task: {}", t); - taskDao.update(t); + new TaskDao(c).update(t); } catch (Exception e) { throw new TaskFailureException(e); } } }); +// transactionTemplate.execute(new TransactionCallbackWithoutResult() { +// protected void doInTransactionWithoutResult(TransactionStatus status) { +// try { +// callable.run(task.arguments); +// Date completed = new Date(); +// Task t = task.registerComplete(completed); +// log.info("Completed task: {}", t); +// taskDao.update(t); +// } catch (Exception e) { +// throw new TaskFailureException(e); +// } +// } +// }); } } diff --git a/src/main/java/io/trygvis/async/SqlEffect.java b/src/main/java/io/trygvis/async/SqlEffect.java new file mode 100644 index 0000000..d0c4e9b --- /dev/null +++ b/src/main/java/io/trygvis/async/SqlEffect.java @@ -0,0 +1,12 @@ +package io.trygvis.async; + +import java.sql.Connection; +import java.sql.SQLException; + +public interface SqlEffect<A> { + A doInConnection(Connection c) throws SQLException; + + interface Void { + void doInConnection(Connection c) throws SQLException; + } +} diff --git a/src/main/java/io/trygvis/async/SqlEffectExecutor.java b/src/main/java/io/trygvis/async/SqlEffectExecutor.java new file mode 100644 index 0000000..c8abbd3 --- /dev/null +++ b/src/main/java/io/trygvis/async/SqlEffectExecutor.java @@ -0,0 +1,39 @@ +package io.trygvis.async; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; + +public class SqlEffectExecutor { + + private final DataSource dataSource; + + public SqlEffectExecutor(DataSource dataSource) { + this.dataSource = dataSource; + } + + public <A> A execute(SqlEffect<A> effect) { + try (Connection c = dataSource.getConnection()) { + return effect.doInConnection(c); + } catch (SQLException e) { + throw new SqlExecutionException(e); + } + } + + public void execute(SqlEffect.Void effect) { + try (Connection c = dataSource.getConnection()) { + effect.doInConnection(c); + } catch (SQLException e) { + throw new SqlExecutionException(e); + } + } + + public static class SqlExecutionException extends RuntimeException { + public final SQLException exception; + + public SqlExecutionException(SQLException ex) { + super(ex); + this.exception = ex; + } + } +} diff --git a/src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java b/src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java new file mode 100644 index 0000000..8517c68 --- /dev/null +++ b/src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java @@ -0,0 +1,102 @@ +package io.trygvis.async.spring; + +import io.trygvis.async.AsyncService; +import io.trygvis.async.JdbcAsyncService; +import io.trygvis.async.SqlEffectExecutor; +import io.trygvis.queue.Queue; +import io.trygvis.queue.Task; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.core.ConnectionCallback; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionSynchronization; +import org.springframework.transaction.support.TransactionSynchronizationAdapter; +import org.springframework.transaction.support.TransactionTemplate; + +import javax.annotation.PostConstruct; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static org.springframework.transaction.annotation.Propagation.REQUIRED; +import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization; + +public class SpringJdbcAsyncService implements AsyncService { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); + + private final TransactionTemplate transactionTemplate; + + private final JdbcTemplate jdbcTemplate; + + private SqlEffectExecutor sqlEffectExecutor; + + final JdbcAsyncService jdbcAsyncService; + + public SpringJdbcAsyncService(TransactionTemplate transactionTemplate, JdbcTemplate jdbcTemplate) { + this.transactionTemplate = transactionTemplate; + this.jdbcTemplate = jdbcTemplate; + jdbcAsyncService = new JdbcAsyncService(); + sqlEffectExecutor = new SqlEffectExecutor(this.jdbcTemplate.getDataSource()); + } + + @Transactional(propagation = REQUIRED) + public Queue registerQueue(final String name, final int interval, final AsyncService.AsyncCallable callable) { + return jdbcTemplate.execute(new ConnectionCallback<Queue>() { + @Override + public Queue doInConnection(Connection c) throws SQLException { + + Queue q = jdbcAsyncService.registerQueue(c, sqlEffectExecutor, name, interval, callable); + + registerSynchronization(new TransactionSynchronizationAdapter() { + public void afterCompletion(int status) { + log.info("Transaction completed with status = {}", status); + if (status == TransactionSynchronization.STATUS_COMMITTED) { + jdbcAsyncService.startQueue(executor, name); + } + } + }); + + log.info("registerQueue: LEAVE"); + return q; + } + }); + } + + public Queue getQueue(String name) { + return jdbcAsyncService.getQueue(name); + } + + @Transactional(propagation = REQUIRED) + public Task schedule(final Queue queue, final String... args) { + return jdbcTemplate.execute(new ConnectionCallback<Task>() { + @Override + public Task doInConnection(Connection c) throws SQLException { + return jdbcAsyncService.schedule(c, queue, args); + } + }); + } + + public Task schedule(final long parent, final Queue queue, final String... args) { + return jdbcTemplate.execute(new ConnectionCallback<Task>() { + @Override + public Task doInConnection(Connection c) throws SQLException { + return jdbcAsyncService.schedule(c, parent, queue, args); + } + }); + } + + @Transactional(readOnly = true) + public Task update(final Task ref) { + return jdbcTemplate.execute(new ConnectionCallback<Task>() { + @Override + public Task doInConnection(Connection c) throws SQLException { + return jdbcAsyncService.update(c, ref); + } + }); + } +} |