From 1ec4fae12c5e5363591013e5a759590d913d6782 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sun, 16 Jun 2013 12:07:43 +0200 Subject: wip --- src/main/java/io/trygvis/async/AsyncService.java | 12 +- .../java/io/trygvis/async/JdbcAsyncService.java | 61 ++----- .../java/io/trygvis/async/QueueController.java | 135 +++++++++++++++ src/main/java/io/trygvis/async/QueueStats.java | 15 ++ src/main/java/io/trygvis/async/QueueThread.java | 149 ----------------- .../java/io/trygvis/queue/JdbcQueueService.java | 131 ++------------- src/main/java/io/trygvis/queue/QueueExecutor.java | 184 +++++++++++++++++++++ src/main/java/io/trygvis/queue/QueueService.java | 4 +- src/main/java/io/trygvis/queue/QueueSystem.java | 2 +- .../io/trygvis/spring/SpringJdbcAsyncService.java | 43 +++-- .../java/io/trygvis/spring/SpringQueueService.java | 19 +-- src/test/java/io/trygvis/test/Main.java | 4 +- .../io/trygvis/test/jdbc/AsyncConsumerExample.java | 52 +++--- .../io/trygvis/test/jdbc/PlainJavaExample.java | 24 +-- .../io/trygvis/test/spring/PlainSpringTest.java | 15 +- 15 files changed, 467 insertions(+), 383 deletions(-) create mode 100644 src/main/java/io/trygvis/async/QueueController.java create mode 100644 src/main/java/io/trygvis/async/QueueStats.java delete mode 100644 src/main/java/io/trygvis/async/QueueThread.java create mode 100644 src/main/java/io/trygvis/queue/QueueExecutor.java diff --git a/src/main/java/io/trygvis/async/AsyncService.java b/src/main/java/io/trygvis/async/AsyncService.java index daf99e4..9332596 100755 --- a/src/main/java/io/trygvis/async/AsyncService.java +++ b/src/main/java/io/trygvis/async/AsyncService.java @@ -1,9 +1,13 @@ package io.trygvis.async; import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueService; import io.trygvis.queue.Task; import io.trygvis.queue.TaskEffect; +import java.sql.SQLException; +import java.util.Date; import java.util.List; /** @@ -11,13 +15,13 @@ import java.util.List; */ public interface AsyncService { - void registerQueue(Queue queue, TaskEffect processor); + QueueController registerQueue(Queue queue, QueueService.TaskExecutionRequest req, TaskEffect processor) throws SQLException; - Queue getQueue(String name); + QueueExecutor getQueue(String name); - Task schedule(Queue queue, List args); + Task schedule(Queue queue, Date scheduled, List args); - Task schedule(long parent, Queue queue, List args); + Task schedule(Queue queue, long parent, Date scheduled, List args); /** * Polls for a new state of the execution. diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java index ddfa150..fd4b38b 100644 --- a/src/main/java/io/trygvis/async/JdbcAsyncService.java +++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java @@ -1,7 +1,8 @@ package io.trygvis.async; import io.trygvis.queue.JdbcQueueService; -import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueService; import io.trygvis.queue.QueueSystem; import io.trygvis.queue.Task; import io.trygvis.queue.TaskDao; @@ -11,11 +12,8 @@ import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.SQLException; -import java.util.Date; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.concurrent.ScheduledThreadPoolExecutor; import static java.lang.System.currentTimeMillis; import static java.lang.Thread.sleep; @@ -23,7 +21,7 @@ import static java.lang.Thread.sleep; public class JdbcAsyncService { private final Logger log = LoggerFactory.getLogger(getClass()); - private final Map queues = new HashMap<>(); + private final Map queues = new HashMap<>(); private final QueueSystem queueSystem; private final JdbcQueueService queueService; @@ -33,49 +31,22 @@ public class JdbcAsyncService { this.queueService = queueSystem.createQueueService(); } - public synchronized void registerQueue(Queue queue, TaskEffect processor) { - final QueueThread queueThread = new QueueThread(queueSystem, processor, queue); - - queues.put(queue.name, queueThread); - - log.info("registerQueue: LEAVE"); - } - - public synchronized void startQueue(Queue queue, ScheduledThreadPoolExecutor executor) { - getQueueThread(queue.name).start(executor); - } - - public synchronized void stopQueue(Queue queue) { - QueueThread queueThread = queues.remove(queue.name); - - if (queueThread == null) { - throw new RuntimeException("No such queue: '" + queue.name + "'."); + public synchronized QueueController registerQueue(QueueExecutor queue, QueueService.TaskExecutionRequest req, TaskEffect processor) { + if (queues.containsKey(queue.queue.name)) { + throw new IllegalArgumentException("Queue already exist."); } - queueThread.stop(); - } - - public Queue getQueue(String name) { - QueueThread queueThread = getQueueThread(name); + QueueController queueController = new QueueController(queueSystem, req, processor, queue); - return queueThread.queue; - } + queues.put(queue.queue.name, queueController); - public Task schedule(Connection c, final Queue queue, List args) throws SQLException { - return scheduleInner(c, null, queue, args); - } + log.info("registerQueue: LEAVE"); - public Task schedule(Connection c, long parent, Queue queue, List args) throws SQLException { - return scheduleInner(c, parent, queue, args); + return queueController; } - private Task scheduleInner(Connection c, Long parent, final Queue queue, List args) throws SQLException { - Date scheduled = new Date(); - - Task task = queueService.schedule(c, queue, parent, scheduled, args); - log.info("Created task = {}", task); - - return task; + public QueueExecutor getQueue(String name) { + return getQueueThread(name).queue; } public Task await(Connection c, Task task, long timeout) throws SQLException { @@ -105,12 +76,12 @@ public class JdbcAsyncService { return taskDao.findById(ref.id()); } - private QueueThread getQueueThread(String name) { - QueueThread queueThread = queues.get(name); + private QueueController getQueueThread(String name) { + QueueController queueController = queues.get(name); - if (queueThread == null) { + if (queueController == null) { throw new RuntimeException("No such queue: '" + name + "'."); } - return queueThread; + return queueController; } } diff --git a/src/main/java/io/trygvis/async/QueueController.java b/src/main/java/io/trygvis/async/QueueController.java new file mode 100644 index 0000000..ea1c42d --- /dev/null +++ b/src/main/java/io/trygvis/async/QueueController.java @@ -0,0 +1,135 @@ +package io.trygvis.async; + +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueSystem; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.List; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static io.trygvis.queue.QueueService.TaskExecutionRequest; +import static io.trygvis.queue.Task.TaskState.NEW; + +public class QueueController { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final QueueSystem queueSystem; + + private final SqlEffectExecutor sqlEffectExecutor; + + private final TaskEffect taskEffect; + + private final TaskExecutionRequest req; + + public final QueueExecutor queue; + + private boolean shouldRun = true; + + private boolean checkForNewTasks; + + private boolean running; + + private Thread thread; + + private ScheduledThreadPoolExecutor executor; + + public QueueController(QueueSystem queueSystem, TaskExecutionRequest req, TaskEffect taskEffect, QueueExecutor queue) { + this.queueSystem = queueSystem; + this.req = req; + this.sqlEffectExecutor = queueSystem.sqlEffectExecutor; + this.taskEffect = taskEffect; + this.queue = queue; + } + + private class QueueThread implements Runnable { + public void run() { + while (shouldRun) { + List tasks = null; + + try { + tasks = sqlEffectExecutor.transaction(new SqlEffect>() { + public List doInConnection(Connection c) throws SQLException { + return queueSystem.createTaskDao(c).findByQueueAndState(queue.queue.name, NEW, req.chunkSize); + } + }); + + log.info("Found {} tasks on queue {}", tasks.size(), queue.queue.name); + + if (tasks.size() > 0) { + queue.executeTasks(req, taskEffect, tasks, executor); + } + } catch (Throwable e) { + if (shouldRun) { + log.warn("Error while executing tasks.", e); + } + } + + // If we found exactly the same number of tasks that we asked for, there is most likely more to go. + if (tasks != null && tasks.size() == req.chunkSize) { + continue; + } + + synchronized (this) { + if (checkForNewTasks) { + log.info("Ping received!"); + checkForNewTasks = false; + } else { + try { + wait(queue.queue.interval); + } catch (InterruptedException e) { + // ignore + } + } + } + } + + log.info("Thread for queue {} has stopped.", queue.queue.name); + running = false; + synchronized (this) { + this.notifyAll(); + } + } + } + + public synchronized void start(ScheduledThreadPoolExecutor executor) { + if (running) { + throw new IllegalStateException("Already running"); + } + + log.info("Starting thread for queue {} with poll interval = {}s", queue.queue.name, queue.queue.interval); + + running = true; + this.executor = executor; + + thread = new Thread(new QueueThread(), "queue: " + queue.queue.name); + thread.setDaemon(true); + thread.start(); + } + + public synchronized void stop() { + if (!running) { + return; + } + + log.info("Stopping thread for queue {}", queue.queue.name); + + shouldRun = false; + + thread.interrupt(); + while (running) { + try { + wait(1000); + } catch (InterruptedException e) { + // continue + } + thread.interrupt(); + } + thread = null; + executor.shutdownNow(); + } +} diff --git a/src/main/java/io/trygvis/async/QueueStats.java b/src/main/java/io/trygvis/async/QueueStats.java new file mode 100644 index 0000000..7c75149 --- /dev/null +++ b/src/main/java/io/trygvis/async/QueueStats.java @@ -0,0 +1,15 @@ +package io.trygvis.async; + +public class QueueStats { + public final int totalMessageCount; + public final int okMessageCount; + public final int failedMessageCount; + public final int scheduledMessageCount; + + public QueueStats(int totalMessageCount, int okMessageCount, int failedMessageCount, int scheduledMessageCount) { + this.totalMessageCount = totalMessageCount; + this.okMessageCount = okMessageCount; + this.failedMessageCount = failedMessageCount; + this.scheduledMessageCount = scheduledMessageCount; + } +} diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java deleted file mode 100644 index 61196b6..0000000 --- a/src/main/java/io/trygvis/async/QueueThread.java +++ /dev/null @@ -1,149 +0,0 @@ -package io.trygvis.async; - -import io.trygvis.queue.JdbcQueueService; -import io.trygvis.queue.Queue; -import io.trygvis.queue.QueueSystem; -import io.trygvis.queue.Task; -import io.trygvis.queue.TaskEffect; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.Connection; -import java.sql.SQLException; -import java.util.List; -import java.util.concurrent.ScheduledThreadPoolExecutor; - -import static io.trygvis.queue.QueueService.TaskExecutionRequest; -import static io.trygvis.queue.Task.TaskState.NEW; - -class QueueThread implements Runnable { - private final Logger log = LoggerFactory.getLogger(getClass()); - - private final QueueSystem queueSystem; - - private final JdbcQueueService queueService; - - private final SqlEffectExecutor sqlEffectExecutor; - - private final TaskEffect taskEffect; - - public final Queue queue; - - private boolean shouldRun = true; - - private boolean checkForNewTasks; - - private boolean available; - - private boolean running; - - private Thread thread; - - QueueThread(QueueSystem queueSystem, TaskEffect taskEffect, Queue queue) { - this.queueSystem = queueSystem; - this.sqlEffectExecutor = queueSystem.sqlEffectExecutor; - this.queueService = queueSystem.createQueueService(); - this.taskEffect = taskEffect; - this.queue = queue; - } - - public void ping() { - synchronized (this) { - System.out.println("QueueThread.ping: available=" + available + ", checkForNewTasks=" + checkForNewTasks); - if (available) { - log.info("Sending ping to " + queue); - notifyAll(); - } else { - checkForNewTasks = true; - } - } - } - - public void run() { - while (shouldRun) { - try { - TaskExecutionRequest req = new TaskExecutionRequest(100, true); - - List tasks = sqlEffectExecutor.transaction(new SqlEffect>() { - @Override - public List doInConnection(Connection c) throws SQLException { - return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, 100); - } - }); - - log.info("Found {} tasks on queue {}", tasks.size(), queue.name); - - if (tasks.size() > 0) { - queueService.executeTask(req, taskEffect, tasks); - } - - // If we found exactly the same number of tasks that we asked for, there is most likely more to go. - if (tasks.size() == req.chunkSize) { - continue; - } - } catch (Throwable e) { - if (shouldRun) { - log.warn("Error while executing tasks.", e); - } - } - - synchronized (this) { - available = true; - - if (checkForNewTasks) { - log.info("Ping received!"); - checkForNewTasks = false; - } else { - try { - wait(queue.interval); - } catch (InterruptedException e) { - // ignore - } - } - - available = false; - } - } - - log.info("Thread for queue {} has stopped.", queue.name); - running = false; - synchronized (this) { - this.notifyAll(); - } - } - - public synchronized void start(ScheduledThreadPoolExecutor executor) { - if (running) { - throw new IllegalStateException("Already running"); - } - - log.info("Starting thread for queue {} with poll interval = {}s", queue.name, queue.interval); - - running = true; - - thread = new Thread(this, queue.name); - thread.setDaemon(true); - thread.start(); - } - - public synchronized void stop() { - if (!running) { - return; - } - - log.info("Stopping thread for queue {}", queue.name); - - shouldRun = false; - - thread.interrupt(); - while (running) { - try { - wait(1000); - } catch (InterruptedException e) { - // continue - } - thread.interrupt(); - } - thread = null; - } -} diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java index cb7af4b..a366838 100644 --- a/src/main/java/io/trygvis/queue/JdbcQueueService.java +++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java @@ -1,121 +1,42 @@ package io.trygvis.queue; -import io.trygvis.async.SqlEffect; import io.trygvis.async.SqlEffectExecutor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.SQLException; -import java.util.Date; -import java.util.List; - -import static io.trygvis.queue.QueueService.TaskExecutionRequest; -import static io.trygvis.queue.Task.TaskState.NEW; +import java.util.HashMap; +import java.util.Map; public class JdbcQueueService { - private final Logger log = LoggerFactory.getLogger(getClass()); - private final QueueSystem queueSystem; private final SqlEffectExecutor sqlEffectExecutor; + private final Map queues = new HashMap<>(); + JdbcQueueService(QueueSystem queueSystem) { this.queueSystem = queueSystem; this.sqlEffectExecutor = queueSystem.sqlEffectExecutor; } - public void consumeAll(final Queue queue, final TaskExecutionRequest req, final TaskEffect effect) throws SQLException { - log.info("Consuming tasks: request={}", req); - - List tasks; - do { - tasks = sqlEffectExecutor.transaction(new SqlEffect>() { - @Override - public List doInConnection(Connection c) throws SQLException { - return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, req.chunkSize); - } - }); - - log.info("Consuming chunk with {} tasks", tasks.size()); - - applyTasks(req, effect, tasks); - } while (tasks.size() > 0); - } - - public void executeTask(TaskExecutionRequest req, TaskEffect taskEffect, List tasks) throws SQLException { - applyTasks(req, taskEffect, tasks); - } + public synchronized QueueExecutor getQueue(String name) { + QueueExecutor queueExecutor = queues.get(name); - /** - * Tries to execute all the tasks on the connection. If it fails, it will execute an SQL effect. - */ - private void applyTasks(TaskExecutionRequest req, TaskEffect effect, List tasks) throws SQLException { - for (Task task : tasks) { - boolean ok = applyTask(effect, task); - - if (!ok && req.stopOnError) { - throw new RuntimeException("Error while executing task, id=" + task.id()); - } + if (queueExecutor != null) { + return queueExecutor; } - } - - private boolean applyTask(TaskEffect effect, final Task task) throws SQLException { - try { - final Date run = new Date(); - Integer count = sqlEffectExecutor.transaction(new SqlEffect() { - @Override - public Integer doInConnection(Connection c) throws SQLException { - return queueSystem.createTaskDao(c).update(task.markProcessing()); - } - }); - - if (count == 1) { - log.info("Executing task {}", task.id()); - } else { - log.trace("Missed task {}", task.id()); - } - - final List newTasks = effect.apply(task); - - final Date now = new Date(); - - log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size()); - sqlEffectExecutor.transaction(new SqlEffect.Void() { - @Override - public void doInConnection(Connection c) throws SQLException { - for (Task newTask : newTasks) { - schedule(c, newTask); - } - - queueSystem.createTaskDao(c).update(task.markOk(now)); - } - }); - - return true; - } catch (Exception e) { - final Date now = new Date(); - log.error("Unable to execute task, id=" + task.id(), e); - - sqlEffectExecutor.transaction(new SqlEffect.Void() { - @Override - public void doInConnection(Connection c) throws SQLException { - TaskDao taskDao = queueSystem.createTaskDao(c); - taskDao.update(task.markFailed(now)); - } - }); + throw new IllegalArgumentException("No such queue: " + name); + } - if (e instanceof SQLException) { - throw ((SQLException) e); - } + public synchronized QueueExecutor lookupQueue(Connection c, String name, long interval, boolean autoCreate) throws SQLException { + QueueExecutor queueExecutor = queues.get(name); - return false; + if (queueExecutor != null) { + return queueExecutor; } - } - public Queue lookupQueue(Connection c, String name, int interval, boolean autoCreate) throws SQLException { QueueDao queueDao = queueSystem.createQueueDao(c); Queue q = queueDao.findByName(name); @@ -129,26 +50,8 @@ public class JdbcQueueService { queueDao.insert(q); } - return q; - } - - public void schedule(Connection c, Task task) throws SQLException { - schedule(c, task.queue, task.parent, task.scheduled, task.arguments); - } - - public Task schedule(Connection c, Queue queue, Date scheduled, List arguments) throws SQLException { - return schedule(c, queue.name, null, scheduled, arguments); - } - - public Task schedule(Connection c, Queue queue, long parent, Date scheduled, List arguments) throws SQLException { - return schedule(c, queue.name, parent, scheduled, arguments); - } - - private Task schedule(Connection c, String queue, Long parent, Date scheduled, List arguments) throws SQLException { - TaskDao taskDao = queueSystem.createTaskDao(c); - - long id = taskDao.insert(parent, queue, NEW, scheduled, arguments); - - return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments); + queueExecutor = new QueueExecutor(queueSystem, sqlEffectExecutor, q); + queues.put(name, queueExecutor); + return queueExecutor; } } diff --git a/src/main/java/io/trygvis/queue/QueueExecutor.java b/src/main/java/io/trygvis/queue/QueueExecutor.java new file mode 100644 index 0000000..a1eb3b7 --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueExecutor.java @@ -0,0 +1,184 @@ +package io.trygvis.queue; + +import io.trygvis.async.QueueStats; +import io.trygvis.async.SqlEffect; +import io.trygvis.async.SqlEffectExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Date; +import java.util.List; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.FAILED; +import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.MISSED; +import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.OK; +import static io.trygvis.queue.Task.TaskState.NEW; + +public class QueueExecutor { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final QueueSystem queueSystem; + + private final SqlEffectExecutor sqlEffectExecutor; + + public final Queue queue; + + private final Stats stats = new Stats(); + + public enum TaskExecutionResult { + OK, + FAILED, + MISSED + } + + public QueueExecutor(QueueSystem queueSystem, SqlEffectExecutor sqlEffectExecutor, Queue queue) { + this.queueSystem = queueSystem; + this.sqlEffectExecutor = sqlEffectExecutor; + this.queue = queue; + } + + private static class Stats { + public int total; + public int ok; + public int failed; + public int scheduled; + + public QueueStats toStats() { + return new QueueStats(total, ok, failed, scheduled); + } + } + + public void consumeAll(final QueueService.TaskExecutionRequest req, final TaskEffect effect) throws SQLException { + log.info("Consuming tasks: request={}", req); + + List tasks; + do { + tasks = sqlEffectExecutor.transaction(new SqlEffect>() { + @Override + public List doInConnection(Connection c) throws SQLException { + return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, req.chunkSize); + } + }); + + log.info("Consuming chunk with {} tasks", tasks.size()); + + applyTasks(req, effect, tasks); + } while (tasks.size() > 0); + } + + public void executeTasks(final QueueService.TaskExecutionRequest req, final TaskEffect taskEffect, final List tasks, + ScheduledThreadPoolExecutor executor) { + executor.execute(new Runnable() { + @Override + public void run() { + applyTasks(req, taskEffect, tasks); + } + }); + } + + private void applyTasks(QueueService.TaskExecutionRequest req, TaskEffect effect, List tasks) { + for (Task task : tasks) { + TaskExecutionResult result = applyTask(effect, task); + + if (result == FAILED && req.stopOnError) { + throw new RuntimeException("Error while executing task, id=" + task.id()); + } + } + } + + /** + * Executed each task in its own transaction. + *

+ * If the task fails, the status is set to error in a separate transaction. + */ + private TaskExecutionResult applyTask(TaskEffect effect, final Task task) { + try { + Integer count = sqlEffectExecutor.transaction(new SqlEffect() { + @Override + public Integer doInConnection(Connection c) throws SQLException { + return queueSystem.createTaskDao(c).update(task.markProcessing()); + } + }); + + if (count == 0) { + log.trace("Missed task {}", task.id()); + return MISSED; + } + + log.info("Executing task {}", task.id()); + + final List newTasks = effect.apply(task); + + final Date now = new Date(); + + log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size()); + + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + for (Task newTask : newTasks) { + schedule(c, newTask); + } + + queueSystem.createTaskDao(c).update(task.markOk(now)); + } + }); + + synchronized (stats) { + stats.total++; + stats.ok++; + } + + return OK; + } catch (Exception e) { + final Date now = new Date(); + log.error("Unable to execute task, id=" + task.id(), e); + + synchronized (stats) { + stats.total++; + stats.failed++; + } + + try { + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + TaskDao taskDao = queueSystem.createTaskDao(c); + taskDao.update(task.markFailed(now)); + } + }); + } catch (SQLException e1) { + log.error("Error while marking task as failed.", e1); + } + + return FAILED; + } + } + + public void schedule(Connection c, Task task) throws SQLException { + schedule(c, task.queue, task.parent, task.scheduled, task.arguments); + } + + public Task schedule(Connection c, Date scheduled, List arguments) throws SQLException { + return schedule(c, queue.name, null, scheduled, arguments); + } + + public Task schedule(Connection c, long parent, Date scheduled, List arguments) throws SQLException { + return schedule(c, queue.name, parent, scheduled, arguments); + } + + private Task schedule(Connection c, String queue, Long parent, Date scheduled, List arguments) throws SQLException { + TaskDao taskDao = queueSystem.createTaskDao(c); + + long id = taskDao.insert(parent, queue, NEW, scheduled, arguments); + + synchronized (stats) { + stats.scheduled++; + } + + return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments); + } +} diff --git a/src/main/java/io/trygvis/queue/QueueService.java b/src/main/java/io/trygvis/queue/QueueService.java index d97eaf0..eee14ed 100644 --- a/src/main/java/io/trygvis/queue/QueueService.java +++ b/src/main/java/io/trygvis/queue/QueueService.java @@ -5,9 +5,7 @@ import java.util.Date; import java.util.List; public interface QueueService { - void consume(Queue queue, TaskExecutionRequest req, TaskEffect effect) throws SQLException; - - Queue getQueue(String name, int interval, boolean autoCreate) throws SQLException; + QueueExecutor getQueue(String name, int interval, boolean autoCreate) throws SQLException; void schedule(Queue queue, Date scheduled, List arguments) throws SQLException; diff --git a/src/main/java/io/trygvis/queue/QueueSystem.java b/src/main/java/io/trygvis/queue/QueueSystem.java index 3b0c018..6710bf4 100644 --- a/src/main/java/io/trygvis/queue/QueueSystem.java +++ b/src/main/java/io/trygvis/queue/QueueSystem.java @@ -15,7 +15,7 @@ public class QueueSystem { public final SqlEffectExecutor sqlEffectExecutor; - public final JdbcQueueService queueService; + private final JdbcQueueService queueService; private QueueSystem(SqlEffectExecutor sqlEffectExecutor) throws SQLException { sqlEffectExecutor.transaction(new SqlEffect.Void() { diff --git a/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java b/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java index b27e94d..a1c9cda 100644 --- a/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java +++ b/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java @@ -2,8 +2,12 @@ package io.trygvis.spring; import io.trygvis.async.AsyncService; import io.trygvis.async.JdbcAsyncService; -import io.trygvis.async.SqlEffectExecutor; +import io.trygvis.async.QueueController; +import io.trygvis.async.SqlEffect; +import io.trygvis.queue.JdbcQueueService; import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueService; import io.trygvis.queue.QueueSystem; import io.trygvis.queue.Task; import io.trygvis.queue.TaskEffect; @@ -17,6 +21,7 @@ import org.springframework.transaction.support.TransactionSynchronizationAdapter import java.sql.Connection; import java.sql.SQLException; +import java.util.Date; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -27,52 +32,68 @@ import static org.springframework.transaction.support.TransactionSynchronization public class SpringJdbcAsyncService implements AsyncService { private final Logger log = LoggerFactory.getLogger(getClass()); - private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); + private final ScheduledThreadPoolExecutor executor; private final JdbcTemplate jdbcTemplate; private final JdbcAsyncService jdbcAsyncService; + private final JdbcQueueService queueService; + + private final QueueSystem queueSystem; + public SpringJdbcAsyncService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) { + this.queueSystem = queueSystem; this.jdbcTemplate = jdbcTemplate; jdbcAsyncService = new JdbcAsyncService(queueSystem); + queueService = queueSystem.createQueueService(); + executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); } @Transactional(propagation = REQUIRED) - public void registerQueue(final Queue queue, final TaskEffect processor) { - jdbcAsyncService.registerQueue(queue, processor); + public QueueController registerQueue(final Queue queue, final QueueService.TaskExecutionRequest req, final TaskEffect processor) throws SQLException { + QueueExecutor queueExecutor = queueSystem.sqlEffectExecutor.transaction(new SqlEffect() { + @Override + public QueueExecutor doInConnection(Connection c) throws SQLException { + return queueService.lookupQueue(c, queue.name, queue.interval, true); + } + }); + + final QueueController queueController = jdbcAsyncService.registerQueue(queueExecutor, req, processor); registerSynchronization(new TransactionSynchronizationAdapter() { public void afterCompletion(int status) { log.info("Transaction completed with status = {}", status); if (status == TransactionSynchronization.STATUS_COMMITTED) { - jdbcAsyncService.startQueue(queue, executor); + queueController.start(executor); } } }); - log.info("registerQueue: LEAVE"); + return queueController; } - public Queue getQueue(String name) { + public QueueExecutor getQueue(String name) { return jdbcAsyncService.getQueue(name); } @Transactional(propagation = REQUIRED) - public Task schedule(final Queue queue, final List args) { + public Task schedule(final Queue queue, final Date scheduled, final List args) { return jdbcTemplate.execute(new ConnectionCallback() { @Override public Task doInConnection(Connection c) throws SQLException { - return jdbcAsyncService.schedule(c, queue, args); + QueueExecutor queueExecutor = queueService.getQueue(queue.name); + return queueExecutor.schedule(c, scheduled, args); } }); } - public Task schedule(final long parent, final Queue queue, final List args) { + public Task schedule(final Queue queue, final long parent, final Date scheduled, final List args) { return jdbcTemplate.execute(new ConnectionCallback() { @Override public Task doInConnection(Connection c) throws SQLException { - return jdbcAsyncService.schedule(c, parent, queue, args); + QueueExecutor queueExecutor = queueService.getQueue(queue.name); + return queueExecutor.schedule(c, parent, scheduled, args); } }); } diff --git a/src/main/java/io/trygvis/spring/SpringQueueService.java b/src/main/java/io/trygvis/spring/SpringQueueService.java index 271e9bf..2027ab5 100644 --- a/src/main/java/io/trygvis/spring/SpringQueueService.java +++ b/src/main/java/io/trygvis/spring/SpringQueueService.java @@ -2,9 +2,9 @@ package io.trygvis.spring; import io.trygvis.queue.JdbcQueueService; import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueExecutor; import io.trygvis.queue.QueueService; import io.trygvis.queue.QueueSystem; -import io.trygvis.queue.TaskEffect; import org.springframework.dao.DataAccessException; import org.springframework.jdbc.core.ConnectionCallback; import org.springframework.jdbc.core.JdbcTemplate; @@ -23,21 +23,14 @@ public class SpringQueueService implements QueueService { public SpringQueueService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) { this.jdbcTemplate = jdbcTemplate; - this.queueService = queueSystem.queueService; - } - - /** - * @see JdbcQueueService#consumeAll(io.trygvis.queue.Queue, io.trygvis.queue.QueueService.TaskExecutionRequest, io.trygvis.queue.TaskEffect) - */ - public void consume(final Queue queue, TaskExecutionRequest req, final TaskEffect effect) throws SQLException { - queueService.consumeAll(queue, req, effect); + this.queueService = queueSystem.createQueueService(); } @Transactional - public Queue getQueue(final String name, final int interval, final boolean autoCreate) throws SQLException { - return jdbcTemplate.execute(new ConnectionCallback() { + public QueueExecutor getQueue(final String name, final int interval, final boolean autoCreate) throws SQLException { + return jdbcTemplate.execute(new ConnectionCallback() { @Override - public Queue doInConnection(Connection c) throws SQLException, DataAccessException { + public QueueExecutor doInConnection(Connection c) throws SQLException, DataAccessException { return queueService.lookupQueue(c, name, interval, autoCreate); } }); @@ -48,7 +41,7 @@ public class SpringQueueService implements QueueService { jdbcTemplate.execute(new ConnectionCallback() { @Override public Object doInConnection(Connection c) throws SQLException, DataAccessException { - queueService.schedule(c, queue, scheduled, arguments); + queueService.getQueue(queue.name).schedule(c, scheduled, arguments); return null; } }); diff --git a/src/test/java/io/trygvis/test/Main.java b/src/test/java/io/trygvis/test/Main.java index 0721ec9..f03d6fa 100755 --- a/src/test/java/io/trygvis/test/Main.java +++ b/src/test/java/io/trygvis/test/Main.java @@ -74,7 +74,9 @@ public class Main { final Queue q = null; // queueService.lookupQueue(c, "create-article", 1); - asyncService.registerQueue(q, createArticleCallable); + QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(100, true); + + asyncService.registerQueue(q, req, createArticleCallable); // log.info("queue registered: ref = {}", q); // asyncService.registerQueue("update-queue", 1, updateArticleCallable); diff --git a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java index 8d981f2..dd478d7 100644 --- a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java +++ b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java @@ -1,10 +1,12 @@ package io.trygvis.test.jdbc; import io.trygvis.async.JdbcAsyncService; +import io.trygvis.async.QueueController; import io.trygvis.async.SqlEffect; import io.trygvis.async.SqlEffectExecutor; import io.trygvis.queue.JdbcQueueService; -import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueService; import io.trygvis.queue.QueueSystem; import io.trygvis.queue.Task; import io.trygvis.queue.TaskEffect; @@ -38,36 +40,36 @@ public class AsyncConsumerExample { } }; - public static class Consumer { - public static void main(String[] args) throws Exception { - System.out.println("Starting consumer"); + public static void main(String[] args) throws Exception { + System.out.println("Starting consumer"); - DataSource ds = createDataSource(); + DataSource ds = createDataSource(); - SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); + SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); - QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); - JdbcAsyncService asyncService = queueSystem.createAsyncService(); - final JdbcQueueService queueService = queueSystem.queueService; + QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); + JdbcAsyncService asyncService = queueSystem.createAsyncService(); + final JdbcQueueService queueService = queueSystem.createQueueService(); - Queue[] queues = sqlEffectExecutor.transaction(new SqlEffect() { - @Override - public Queue[] doInConnection(Connection c) throws SQLException { - return new Queue[]{ - queueService.lookupQueue(c, inputName, interval, true), - queueService.lookupQueue(c, outputName, interval, true) - }; - } - }); + QueueExecutor[] queues = sqlEffectExecutor.transaction(new SqlEffect() { + @Override + public QueueExecutor[] doInConnection(Connection c) throws SQLException { + return new QueueExecutor[]{ + queueService.lookupQueue(c, inputName, interval, true), + queueService.lookupQueue(c, outputName, interval, true) + }; + } + }); - final Queue input = queues[0]; - final Queue output = queues[1]; + final QueueExecutor input = queues[0]; + final QueueExecutor output = queues[1]; - asyncService.registerQueue(input, adder); + QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(100, true); - asyncService.startQueue(input, new ScheduledThreadPoolExecutor(2)); - Thread.sleep(5 * 1000); - asyncService.stopQueue(input); - } + QueueController controller = asyncService.registerQueue(input, req, adder); + + controller.start(new ScheduledThreadPoolExecutor(2)); + Thread.sleep(60 * 1000); + controller.stop(); } } diff --git a/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java b/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java index 994c310..0e11ab3 100644 --- a/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java +++ b/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java @@ -3,7 +3,7 @@ package io.trygvis.test.jdbc; import io.trygvis.async.SqlEffect; import io.trygvis.async.SqlEffectExecutor; import io.trygvis.queue.JdbcQueueService; -import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueExecutor; import io.trygvis.queue.QueueService; import io.trygvis.queue.QueueStats; import io.trygvis.queue.QueueSystem; @@ -42,12 +42,12 @@ public class PlainJavaExample { SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); final QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); - final JdbcQueueService queueService = queueSystem.queueService; + final JdbcQueueService queueService = queueSystem.createQueueService(); - Queue[] queues = sqlEffectExecutor.transaction(new SqlEffect() { + QueueExecutor[] queues = sqlEffectExecutor.transaction(new SqlEffect() { @Override - public Queue[] doInConnection(Connection c) throws SQLException { - Queue[] queues = { + public QueueExecutor[] doInConnection(Connection c) throws SQLException { + QueueExecutor[] queues = { queueService.lookupQueue(c, inputName, interval, true), queueService.lookupQueue(c, outputName, interval, true)}; @@ -63,12 +63,12 @@ public class PlainJavaExample { } }); - final Queue input = queues[0]; - final Queue output = queues[1]; + final QueueExecutor input = queues[0]; + final QueueExecutor output = queues[1]; QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(1000, false); - queueService.consumeAll(input, req, new TaskEffect() { + input.consumeAll(req, new TaskEffect() { public List apply(Task task) throws Exception { Long a = Long.valueOf(task.arguments.get(0)); Long b = Long.valueOf(task.arguments.get(1)); @@ -76,7 +76,7 @@ public class PlainJavaExample { System.out.println("a + b = " + a + " + " + b + " = " + (a + b)); if (r.nextInt(3000) > 0) { - return singletonList(task.childTask(output.name, new Date(), Long.toString(a + b))); + return singletonList(task.childTask(output.queue.name, new Date(), Long.toString(a + b))); } throw new RuntimeException("Simulated exception while processing task."); @@ -98,9 +98,9 @@ public class PlainJavaExample { SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); - final JdbcQueueService queueService = queueSystem.queueService; + final JdbcQueueService queueService = queueSystem.createQueueService(); - final Queue queue; + final QueueExecutor queue; try (Connection c = ds.getConnection()) { queue = queueService.lookupQueue(c, inputName, interval, true); c.commit(); @@ -112,7 +112,7 @@ public class PlainJavaExample { @Override public void doInConnection(Connection c) throws SQLException { for (int j = 0; j < chunk; j++) { - queueService.schedule(c, queue, new Date(), asList("10", "20")); + queue.schedule(c, new Date(), asList("10", "20")); } } }); diff --git a/src/test/java/io/trygvis/test/spring/PlainSpringTest.java b/src/test/java/io/trygvis/test/spring/PlainSpringTest.java index d06d8d6..38d3361 100644 --- a/src/test/java/io/trygvis/test/spring/PlainSpringTest.java +++ b/src/test/java/io/trygvis/test/spring/PlainSpringTest.java @@ -1,7 +1,7 @@ package io.trygvis.test.spring; import io.trygvis.async.AsyncService; -import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueExecutor; import io.trygvis.queue.QueueService; import io.trygvis.queue.Task; import io.trygvis.queue.TaskEffect; @@ -13,6 +13,7 @@ import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import java.sql.SQLException; +import java.util.Date; import java.util.List; import java.util.concurrent.atomic.AtomicReference; @@ -33,6 +34,8 @@ public class PlainSpringTest { @Autowired private QueueService queueService; + private final QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(100, true); + static { String username = getProperty("user.name"); setProperty("database.url", getProperty("jdbc.url", "jdbc:postgresql://localhost/" + username)); @@ -42,9 +45,9 @@ public class PlainSpringTest { @Test public void testBasic() throws SQLException, InterruptedException { - Queue test = queueService.getQueue("test", 10, true); + QueueExecutor test = queueService.getQueue("test", 10, true); final AtomicReference> ref = new AtomicReference<>(); - asyncService.registerQueue(test, new TaskEffect() { + asyncService.registerQueue(test.queue, req, new TaskEffect() { @Override public List apply(Task task) throws Exception { System.out.println("PlainSpringTest.run"); @@ -58,12 +61,14 @@ public class PlainSpringTest { synchronized (ref) { System.out.println("Scheduling task"); - asyncService.schedule(test, asList("hello", "world")); - System.out.println("Waiting"); + queueService.schedule(test.queue, new Date(), asList("hello", "world")); + System.out.println("Task scheduled, waiting"); ref.wait(1000); + System.out.println("Back!"); } List args = ref.get(); + System.out.println("args = " + args); assertNotNull(args); assertThat(args).containsExactly("hello", "world"); } -- cgit v1.2.3