diff options
Diffstat (limited to 'src/main/java/io/trygvis/async')
-rwxr-xr-x | src/main/java/io/trygvis/async/AsyncService.java | 7 | ||||
-rw-r--r-- | src/main/java/io/trygvis/async/JdbcAsyncService.java | 22 | ||||
-rw-r--r-- | src/main/java/io/trygvis/async/QueueThread.java | 84 | ||||
-rw-r--r-- | src/main/java/io/trygvis/async/SqlEffectExecutor.java | 55 |
4 files changed, 85 insertions, 83 deletions
diff --git a/src/main/java/io/trygvis/async/AsyncService.java b/src/main/java/io/trygvis/async/AsyncService.java index 17d53e9..daf99e4 100755 --- a/src/main/java/io/trygvis/async/AsyncService.java +++ b/src/main/java/io/trygvis/async/AsyncService.java @@ -2,6 +2,7 @@ package io.trygvis.async; import io.trygvis.queue.Queue; import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; import java.util.List; @@ -10,7 +11,7 @@ import java.util.List; */ public interface AsyncService { - void registerQueue(Queue queue, final AsyncService.AsyncCallable callable); + void registerQueue(Queue queue, TaskEffect processor); Queue getQueue(String name); @@ -22,8 +23,4 @@ public interface AsyncService { * Polls for a new state of the execution. */ Task update(Task ref); - - interface AsyncCallable { - void run(List<String> arguments) throws Exception; - } } diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java index 310c59b..6baa56e 100644 --- a/src/main/java/io/trygvis/async/JdbcAsyncService.java +++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java @@ -1,8 +1,11 @@ package io.trygvis.async; +import io.trygvis.queue.JdbcQueueService; import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueSystem; import io.trygvis.queue.Task; import io.trygvis.queue.TaskDao; +import io.trygvis.queue.TaskEffect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,8 +26,16 @@ public class JdbcAsyncService { private final Map<String, QueueThread> queues = new HashMap<>(); - public void registerQueue(SqlEffectExecutor sqlEffectExecutor, Queue queue, AsyncService.AsyncCallable callable) { - final QueueThread queueThread = new QueueThread(sqlEffectExecutor, callable, queue); + private final QueueSystem queueSystem; + private final JdbcQueueService queueService; + + public JdbcAsyncService(QueueSystem queueSystem) { + this.queueSystem = queueSystem; + this.queueService = queueSystem.createQueueService(); + } + + public void registerQueue(Queue queue, TaskEffect processor) { + final QueueThread queueThread = new QueueThread(queueSystem, processor, queue); queues.put(queue.name, queueThread); @@ -69,12 +80,11 @@ public class JdbcAsyncService { } private Task scheduleInner(Connection c, Long parent, final Queue queue, List<String> args) throws SQLException { - TaskDao taskDao = new TaskDao(c); + TaskDao taskDao = queueSystem.createTaskDao(c); Date scheduled = new Date(); - long id = taskDao.insert(parent, queue.name, scheduled, args); - Task task = new Task(id, parent, queue.name, scheduled, null, 0, null, args); + Task task = queueService.schedule(c, queue, parent, scheduled, args); log.info("Created task = {}", task); return task; @@ -102,7 +112,7 @@ public class JdbcAsyncService { } public Task update(Connection c, Task ref) throws SQLException { - TaskDao taskDao = new TaskDao(c); + TaskDao taskDao = queueSystem.createTaskDao(c); return taskDao.findById(ref.id()); } diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java index 00c46b4..33753a3 100644 --- a/src/main/java/io/trygvis/async/QueueThread.java +++ b/src/main/java/io/trygvis/async/QueueThread.java @@ -1,24 +1,27 @@ package io.trygvis.async; +import io.trygvis.queue.JdbcQueueService; import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueSystem; import io.trygvis.queue.Task; -import io.trygvis.queue.TaskDao; +import io.trygvis.queue.TaskEffect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.SQLException; -import java.util.Date; import java.util.List; -import static io.trygvis.async.SqlEffectExecutor.SqlExecutionException; - class QueueThread implements Runnable { private final Logger log = LoggerFactory.getLogger(getClass()); + private final QueueSystem queueSystem; + + private final JdbcQueueService queueService; + private final SqlEffectExecutor sqlEffectExecutor; - private final AsyncService.AsyncCallable callable; + private final TaskEffect taskEffect; public final Queue queue; @@ -28,9 +31,11 @@ class QueueThread implements Runnable { private boolean busy; - QueueThread(SqlEffectExecutor sqlEffectExecutor, AsyncService.AsyncCallable callable, Queue queue) { - this.sqlEffectExecutor = sqlEffectExecutor; - this.callable = callable; + QueueThread(QueueSystem queueSystem, TaskEffect taskEffect, Queue queue) { + this.queueSystem = queueSystem; + this.sqlEffectExecutor = queueSystem.sqlEffectExecutor; + this.queueService = queueSystem.createQueueService(); + this.taskEffect = taskEffect; this.queue = queue; } @@ -48,28 +53,17 @@ class QueueThread implements Runnable { public void run() { while (shouldRun) { try { -// List<Task> tasks = transactionTemplate.execute(new TransactionCallback<List<Task>>() { -// public List<Task> doInTransaction(TransactionStatus status) { -// return taskDao.findByNameAndCompletedIsNull(queue.name); -// } -// }); - List<Task> tasks = sqlEffectExecutor.execute(new SqlEffect<List<Task>>() { + List<Task> tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() { @Override - public List<Task> doInConnection(Connection connection) throws SQLException { - return new TaskDao(connection).findByNameAndCompletedIsNull(queue.name); + public List<Task> doInConnection(Connection c) throws SQLException { + return queueSystem.createTaskDao(c).findByNameAndCompletedIsNull(queue.name); } }); log.info("Found {} tasks on queue {}", tasks.size(), queue.name); if (tasks.size() > 0) { - for (final Task task : tasks) { - try { - executeTask(task); - } catch (SqlExecutionException | TaskFailureException e) { - log.warn("Task execution failed", e); - } - } + queueService.executeTask(taskEffect, tasks); } } catch (Throwable e) { log.warn("Error while executing tasks.", e); @@ -94,48 +88,4 @@ class QueueThread implements Runnable { } } } - - private void executeTask(final Task task) { - final Date run = new Date(); - log.info("Setting last run on task. date = {}, task = {}", run, task); - sqlEffectExecutor.execute(new SqlEffect.Void() { - @Override - public void doInConnection(Connection connection) throws SQLException { - new TaskDao(connection).update(task.registerRun()); - } - }); -// transactionTemplate.execute(new TransactionCallbackWithoutResult() { -// protected void doInTransactionWithoutResult(TransactionStatus status) { -// taskDao.update(task.registerRun()); -// } -// }); - - sqlEffectExecutor.execute(new SqlEffect.Void() { - @Override - public void doInConnection(Connection c) throws SQLException { - try { - callable.run(task.arguments); - Date completed = new Date(); - Task t = task.registerComplete(completed); - log.info("Completed task: {}", t); - new TaskDao(c).update(t); - } catch (Exception e) { - throw new TaskFailureException(e); - } - } - }); -// transactionTemplate.execute(new TransactionCallbackWithoutResult() { -// protected void doInTransactionWithoutResult(TransactionStatus status) { -// try { -// callable.run(task.arguments); -// Date completed = new Date(); -// Task t = task.registerComplete(completed); -// log.info("Completed task: {}", t); -// taskDao.update(t); -// } catch (Exception e) { -// throw new TaskFailureException(e); -// } -// } -// }); - } } diff --git a/src/main/java/io/trygvis/async/SqlEffectExecutor.java b/src/main/java/io/trygvis/async/SqlEffectExecutor.java index c8abbd3..e03cf5e 100644 --- a/src/main/java/io/trygvis/async/SqlEffectExecutor.java +++ b/src/main/java/io/trygvis/async/SqlEffectExecutor.java @@ -12,22 +12,67 @@ public class SqlEffectExecutor { this.dataSource = dataSource; } - public <A> A execute(SqlEffect<A> effect) { + public <A> A transaction(SqlEffect<A> effect) { +/* + int pid; + try (Connection c = dataSource.getConnection()) { - return effect.doInConnection(c); + + try (Statement statement = c.createStatement()) { + ResultSet rs = statement.executeQuery("SELECT pg_backend_pid()"); + rs.next(); + pid = rs.getInt(1); + } + + System.out.println("pid = " + pid); + + try { + effect.doInConnection(c); + c.commit(); + } catch (SQLException e) { + c.rollback(); + e.printStackTrace(); + } finally { + System.out.println("Closing pid=" + pid); + try { + c.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } } catch (SQLException e) { + e.printStackTrace(); throw new SqlExecutionException(e); } - } +*/ - public void execute(SqlEffect.Void effect) { try (Connection c = dataSource.getConnection()) { - effect.doInConnection(c); + boolean ok = false; + try { + A a = effect.doInConnection(c); + c.commit(); + ok = true; + return a; + } finally { + if (!ok) { + c.rollback(); + } + } } catch (SQLException e) { throw new SqlExecutionException(e); } } + public void transaction(final SqlEffect.Void effect) { + transaction(new SqlEffect<Object>() { + @Override + public Object doInConnection(Connection c) throws SQLException { + effect.doInConnection(c); + return null; + } + }); + } + public static class SqlExecutionException extends RuntimeException { public final SQLException exception; |