aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/async
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/async')
-rwxr-xr-xsrc/main/java/io/trygvis/async/AsyncService.java5
-rw-r--r--src/main/java/io/trygvis/async/JdbcAsyncService.java109
-rw-r--r--src/main/java/io/trygvis/async/QueueThread.java78
-rw-r--r--src/main/java/io/trygvis/async/SqlEffect.java12
-rw-r--r--src/main/java/io/trygvis/async/SqlEffectExecutor.java39
-rw-r--r--src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java102
6 files changed, 247 insertions, 98 deletions
diff --git a/src/main/java/io/trygvis/async/AsyncService.java b/src/main/java/io/trygvis/async/AsyncService.java
index e90a0e4..57c1af8 100755
--- a/src/main/java/io/trygvis/async/AsyncService.java
+++ b/src/main/java/io/trygvis/async/AsyncService.java
@@ -2,8 +2,8 @@ package io.trygvis.async;
import io.trygvis.queue.Queue;
import io.trygvis.queue.Task;
-import org.quartz.SchedulerException;
+import java.sql.SQLException;
import java.util.List;
/**
@@ -16,9 +16,8 @@ public interface AsyncService {
* @param interval how often the queue should be polled for missed tasks in seconds.
* @param callable
* @return
- * @throws SchedulerException
*/
- Queue registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException;
+ Queue registerQueue(final String name, final int interval, AsyncCallable callable);
Queue getQueue(String name);
diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java
index 4e78a37..c34330e 100644
--- a/src/main/java/io/trygvis/async/JdbcAsyncService.java
+++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java
@@ -4,88 +4,66 @@ import io.trygvis.queue.Queue;
import io.trygvis.queue.QueueDao;
import io.trygvis.queue.Task;
import io.trygvis.queue.TaskDao;
-import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-import org.springframework.transaction.annotation.Transactional;
-import org.springframework.transaction.support.TransactionSynchronization;
-import org.springframework.transaction.support.TransactionSynchronizationAdapter;
-import org.springframework.transaction.support.TransactionTemplate;
+import java.sql.Connection;
+import java.sql.SQLException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.sleep;
import static java.util.Arrays.asList;
import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.springframework.transaction.annotation.Propagation.REQUIRED;
-import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization;
-@Component
-public class JdbcAsyncService implements AsyncService {
+public class JdbcAsyncService {
private final Logger log = LoggerFactory.getLogger(getClass());
- private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory());
-
private final Map<String, QueueThread> queues = new HashMap<>();
- @Autowired
- private TransactionTemplate transactionTemplate;
-
- @Autowired
- private QueueDao queueDao;
-
- @Autowired
- private TaskDao taskDao;
+ public Queue registerQueue(Connection c, SqlEffectExecutor sqlEffectExecutor, final String name, final int interval, AsyncService.AsyncCallable callable) throws SQLException {
+ QueueDao queueDao = new QueueDao(c);
- @Transactional(propagation = REQUIRED)
- public Queue registerQueue(final String name, final int interval, AsyncCallable callable) throws SchedulerException {
log.info("registerQueue: ENTER");
Queue q = queueDao.findByName(name);
log.info("q = {}", q);
- final long interval_;
if (q == null) {
q = new Queue(name, interval);
queueDao.insert(q);
- interval_ = interval;
- } else {
- // Found an existing queue. Use the Settings from the database.
- interval_ = q.interval;
}
- final QueueThread queueThread = new QueueThread(q, taskDao, transactionTemplate, callable);
+ final QueueThread queueThread = new QueueThread(sqlEffectExecutor, callable, q);
queues.put(name, queueThread);
- registerSynchronization(new TransactionSynchronizationAdapter() {
- public void afterCompletion(int status) {
- log.info("Transaction completed with status = {}", status);
- if (status == TransactionSynchronization.STATUS_COMMITTED) {
- log.info("Starting thread for queue {} with poll interval = {}s", name, interval);
- executor.scheduleAtFixedRate(new Runnable() {
- public void run() {
- queueThread.ping();
- }
- }, 10, interval_, SECONDS);
- Thread thread = new Thread(queueThread, name);
- thread.setDaemon(true);
- thread.start();
- }
- }
- });
-
log.info("registerQueue: LEAVE");
return q;
}
+ public void startQueue(ScheduledThreadPoolExecutor executor, String name) {
+ final QueueThread queueThread = queues.get(name);
+
+ if (queueThread == null) {
+ throw new RuntimeException("No such queue: " + name);
+ }
+
+ long interval = queueThread.queue.interval;
+ log.info("Starting thread for queue {} with poll interval = {}s", name, interval);
+ executor.scheduleAtFixedRate(new Runnable() {
+ public void run() {
+ queueThread.ping();
+ }
+ }, 10, interval, SECONDS);
+ Thread thread = new Thread(queueThread, name);
+ thread.setDaemon(true);
+ thread.start();
+ }
+
public Queue getQueue(String name) {
QueueThread queueThread = queues.get(name);
@@ -96,16 +74,17 @@ public class JdbcAsyncService implements AsyncService {
return queueThread.queue;
}
- @Transactional(propagation = REQUIRED)
- public Task schedule(final Queue queue, String... args) {
- return scheduleInner(null, queue, args);
+ public Task schedule(Connection c, final Queue queue, String... args) throws SQLException {
+ return scheduleInner(c, null, queue, args);
}
- public Task schedule(long parent, Queue queue, String... args) {
- return scheduleInner(parent, queue, args);
+ public Task schedule(Connection c, long parent, Queue queue, String... args) throws SQLException {
+ return scheduleInner(c, parent, queue, args);
}
- private Task scheduleInner(Long parent, final Queue queue, String... args) {
+ private Task scheduleInner(Connection c, Long parent, final Queue queue, String... args) throws SQLException {
+ TaskDao taskDao = new TaskDao(c);
+
Date scheduled = new Date();
StringBuilder arguments = new StringBuilder();
@@ -114,27 +93,22 @@ public class JdbcAsyncService implements AsyncService {
}
long id = taskDao.insert(parent, queue.name, scheduled, arguments.toString());
- Task task = new Task(parent, id, queue.name, scheduled, null, 0, null, asList(args));
+ Task task = new Task(id, parent, queue.name, scheduled, null, 0, null, asList(args));
log.info("Created task = {}", task);
- registerSynchronization(new TransactionSynchronizationAdapter() {
- public void afterCompletion(int status) {
- if (status == TransactionSynchronization.STATUS_COMMITTED) {
- queues.get(queue.name).ping();
- }
- }
- });
-
return task;
}
- @Transactional
- public Task await(Task task, long timeout) {
+ public Task await(Connection c, Task task, long timeout) throws SQLException {
final long start = currentTimeMillis();
final long end = start + timeout;
while (currentTimeMillis() < end) {
- task = update(task);
+ task = update(c, task);
+
+ if (task == null) {
+ throw new RuntimeException("The task went away.");
+ }
try {
sleep(100);
@@ -146,8 +120,9 @@ public class JdbcAsyncService implements AsyncService {
return task;
}
- @Transactional(readOnly = true)
- public Task update(Task ref) {
+ public Task update(Connection c, Task ref) throws SQLException {
+ TaskDao taskDao = new TaskDao(c);
+
return taskDao.findById(ref.id);
}
}
diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java
index 69466df..00c46b4 100644
--- a/src/main/java/io/trygvis/async/QueueThread.java
+++ b/src/main/java/io/trygvis/async/QueueThread.java
@@ -5,37 +5,33 @@ import io.trygvis.queue.Task;
import io.trygvis.queue.TaskDao;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.transaction.TransactionException;
-import org.springframework.transaction.TransactionStatus;
-import org.springframework.transaction.support.TransactionCallback;
-import org.springframework.transaction.support.TransactionCallbackWithoutResult;
-import org.springframework.transaction.support.TransactionTemplate;
+import java.sql.Connection;
+import java.sql.SQLException;
import java.util.Date;
import java.util.List;
+import static io.trygvis.async.SqlEffectExecutor.SqlExecutionException;
+
class QueueThread implements Runnable {
private final Logger log = LoggerFactory.getLogger(getClass());
- public boolean shouldRun = true;
-
- private boolean checkForNewTasks;
+ private final SqlEffectExecutor sqlEffectExecutor;
- private boolean busy;
+ private final AsyncService.AsyncCallable callable;
public final Queue queue;
- private final TaskDao taskDao;
+ public boolean shouldRun = true;
- private final TransactionTemplate transactionTemplate;
+ private boolean checkForNewTasks;
- private final AsyncService.AsyncCallable callable;
+ private boolean busy;
- QueueThread(Queue queue, TaskDao taskDao, TransactionTemplate transactionTemplate, AsyncService.AsyncCallable callable) {
- this.queue = queue;
- this.taskDao = taskDao;
- this.transactionTemplate = transactionTemplate;
+ QueueThread(SqlEffectExecutor sqlEffectExecutor, AsyncService.AsyncCallable callable, Queue queue) {
+ this.sqlEffectExecutor = sqlEffectExecutor;
this.callable = callable;
+ this.queue = queue;
}
public void ping() {
@@ -52,19 +48,25 @@ class QueueThread implements Runnable {
public void run() {
while (shouldRun) {
try {
- List<Task> tasks = transactionTemplate.execute(new TransactionCallback<List<Task>>() {
- public List<Task> doInTransaction(TransactionStatus status) {
- return taskDao.findByNameAndCompletedIsNull(queue.name);
+// List<Task> tasks = transactionTemplate.execute(new TransactionCallback<List<Task>>() {
+// public List<Task> doInTransaction(TransactionStatus status) {
+// return taskDao.findByNameAndCompletedIsNull(queue.name);
+// }
+// });
+ List<Task> tasks = sqlEffectExecutor.execute(new SqlEffect<List<Task>>() {
+ @Override
+ public List<Task> doInConnection(Connection connection) throws SQLException {
+ return new TaskDao(connection).findByNameAndCompletedIsNull(queue.name);
}
});
log.info("Found {} tasks on queue {}", tasks.size(), queue.name);
- if(tasks.size() > 0) {
+ if (tasks.size() > 0) {
for (final Task task : tasks) {
try {
executeTask(task);
- } catch (TransactionException | TaskFailureException e) {
+ } catch (SqlExecutionException | TaskFailureException e) {
log.warn("Task execution failed", e);
}
}
@@ -96,24 +98,44 @@ class QueueThread implements Runnable {
private void executeTask(final Task task) {
final Date run = new Date();
log.info("Setting last run on task. date = {}, task = {}", run, task);
- transactionTemplate.execute(new TransactionCallbackWithoutResult() {
- protected void doInTransactionWithoutResult(TransactionStatus status) {
- taskDao.update(task.registerRun());
+ sqlEffectExecutor.execute(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection connection) throws SQLException {
+ new TaskDao(connection).update(task.registerRun());
}
});
-
- transactionTemplate.execute(new TransactionCallbackWithoutResult() {
- protected void doInTransactionWithoutResult(TransactionStatus status) {
+// transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+// protected void doInTransactionWithoutResult(TransactionStatus status) {
+// taskDao.update(task.registerRun());
+// }
+// });
+
+ sqlEffectExecutor.execute(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
try {
callable.run(task.arguments);
Date completed = new Date();
Task t = task.registerComplete(completed);
log.info("Completed task: {}", t);
- taskDao.update(t);
+ new TaskDao(c).update(t);
} catch (Exception e) {
throw new TaskFailureException(e);
}
}
});
+// transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+// protected void doInTransactionWithoutResult(TransactionStatus status) {
+// try {
+// callable.run(task.arguments);
+// Date completed = new Date();
+// Task t = task.registerComplete(completed);
+// log.info("Completed task: {}", t);
+// taskDao.update(t);
+// } catch (Exception e) {
+// throw new TaskFailureException(e);
+// }
+// }
+// });
}
}
diff --git a/src/main/java/io/trygvis/async/SqlEffect.java b/src/main/java/io/trygvis/async/SqlEffect.java
new file mode 100644
index 0000000..d0c4e9b
--- /dev/null
+++ b/src/main/java/io/trygvis/async/SqlEffect.java
@@ -0,0 +1,12 @@
+package io.trygvis.async;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+public interface SqlEffect<A> {
+ A doInConnection(Connection c) throws SQLException;
+
+ interface Void {
+ void doInConnection(Connection c) throws SQLException;
+ }
+}
diff --git a/src/main/java/io/trygvis/async/SqlEffectExecutor.java b/src/main/java/io/trygvis/async/SqlEffectExecutor.java
new file mode 100644
index 0000000..c8abbd3
--- /dev/null
+++ b/src/main/java/io/trygvis/async/SqlEffectExecutor.java
@@ -0,0 +1,39 @@
+package io.trygvis.async;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+public class SqlEffectExecutor {
+
+ private final DataSource dataSource;
+
+ public SqlEffectExecutor(DataSource dataSource) {
+ this.dataSource = dataSource;
+ }
+
+ public <A> A execute(SqlEffect<A> effect) {
+ try (Connection c = dataSource.getConnection()) {
+ return effect.doInConnection(c);
+ } catch (SQLException e) {
+ throw new SqlExecutionException(e);
+ }
+ }
+
+ public void execute(SqlEffect.Void effect) {
+ try (Connection c = dataSource.getConnection()) {
+ effect.doInConnection(c);
+ } catch (SQLException e) {
+ throw new SqlExecutionException(e);
+ }
+ }
+
+ public static class SqlExecutionException extends RuntimeException {
+ public final SQLException exception;
+
+ public SqlExecutionException(SQLException ex) {
+ super(ex);
+ this.exception = ex;
+ }
+ }
+}
diff --git a/src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java b/src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java
new file mode 100644
index 0000000..8517c68
--- /dev/null
+++ b/src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java
@@ -0,0 +1,102 @@
+package io.trygvis.async.spring;
+
+import io.trygvis.async.AsyncService;
+import io.trygvis.async.JdbcAsyncService;
+import io.trygvis.async.SqlEffectExecutor;
+import io.trygvis.queue.Queue;
+import io.trygvis.queue.Task;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.jdbc.core.ConnectionCallback;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.transaction.annotation.Transactional;
+import org.springframework.transaction.support.TransactionSynchronization;
+import org.springframework.transaction.support.TransactionSynchronizationAdapter;
+import org.springframework.transaction.support.TransactionTemplate;
+
+import javax.annotation.PostConstruct;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import static org.springframework.transaction.annotation.Propagation.REQUIRED;
+import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization;
+
+public class SpringJdbcAsyncService implements AsyncService {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory());
+
+ private final TransactionTemplate transactionTemplate;
+
+ private final JdbcTemplate jdbcTemplate;
+
+ private SqlEffectExecutor sqlEffectExecutor;
+
+ final JdbcAsyncService jdbcAsyncService;
+
+ public SpringJdbcAsyncService(TransactionTemplate transactionTemplate, JdbcTemplate jdbcTemplate) {
+ this.transactionTemplate = transactionTemplate;
+ this.jdbcTemplate = jdbcTemplate;
+ jdbcAsyncService = new JdbcAsyncService();
+ sqlEffectExecutor = new SqlEffectExecutor(this.jdbcTemplate.getDataSource());
+ }
+
+ @Transactional(propagation = REQUIRED)
+ public Queue registerQueue(final String name, final int interval, final AsyncService.AsyncCallable callable) {
+ return jdbcTemplate.execute(new ConnectionCallback<Queue>() {
+ @Override
+ public Queue doInConnection(Connection c) throws SQLException {
+
+ Queue q = jdbcAsyncService.registerQueue(c, sqlEffectExecutor, name, interval, callable);
+
+ registerSynchronization(new TransactionSynchronizationAdapter() {
+ public void afterCompletion(int status) {
+ log.info("Transaction completed with status = {}", status);
+ if (status == TransactionSynchronization.STATUS_COMMITTED) {
+ jdbcAsyncService.startQueue(executor, name);
+ }
+ }
+ });
+
+ log.info("registerQueue: LEAVE");
+ return q;
+ }
+ });
+ }
+
+ public Queue getQueue(String name) {
+ return jdbcAsyncService.getQueue(name);
+ }
+
+ @Transactional(propagation = REQUIRED)
+ public Task schedule(final Queue queue, final String... args) {
+ return jdbcTemplate.execute(new ConnectionCallback<Task>() {
+ @Override
+ public Task doInConnection(Connection c) throws SQLException {
+ return jdbcAsyncService.schedule(c, queue, args);
+ }
+ });
+ }
+
+ public Task schedule(final long parent, final Queue queue, final String... args) {
+ return jdbcTemplate.execute(new ConnectionCallback<Task>() {
+ @Override
+ public Task doInConnection(Connection c) throws SQLException {
+ return jdbcAsyncService.schedule(c, parent, queue, args);
+ }
+ });
+ }
+
+ @Transactional(readOnly = true)
+ public Task update(final Task ref) {
+ return jdbcTemplate.execute(new ConnectionCallback<Task>() {
+ @Override
+ public Task doInConnection(Connection c) throws SQLException {
+ return jdbcAsyncService.update(c, ref);
+ }
+ });
+ }
+}