From 1ec4fae12c5e5363591013e5a759590d913d6782 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sun, 16 Jun 2013 12:07:43 +0200 Subject: wip --- src/main/java/io/trygvis/async/AsyncService.java | 12 +- .../java/io/trygvis/async/JdbcAsyncService.java | 61 +++------ .../java/io/trygvis/async/QueueController.java | 135 +++++++++++++++++++ src/main/java/io/trygvis/async/QueueStats.java | 15 +++ src/main/java/io/trygvis/async/QueueThread.java | 149 --------------------- 5 files changed, 174 insertions(+), 198 deletions(-) create mode 100644 src/main/java/io/trygvis/async/QueueController.java create mode 100644 src/main/java/io/trygvis/async/QueueStats.java delete mode 100644 src/main/java/io/trygvis/async/QueueThread.java (limited to 'src/main/java/io/trygvis/async') diff --git a/src/main/java/io/trygvis/async/AsyncService.java b/src/main/java/io/trygvis/async/AsyncService.java index daf99e4..9332596 100755 --- a/src/main/java/io/trygvis/async/AsyncService.java +++ b/src/main/java/io/trygvis/async/AsyncService.java @@ -1,9 +1,13 @@ package io.trygvis.async; import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueService; import io.trygvis.queue.Task; import io.trygvis.queue.TaskEffect; +import java.sql.SQLException; +import java.util.Date; import java.util.List; /** @@ -11,13 +15,13 @@ import java.util.List; */ public interface AsyncService { - void registerQueue(Queue queue, TaskEffect processor); + QueueController registerQueue(Queue queue, QueueService.TaskExecutionRequest req, TaskEffect processor) throws SQLException; - Queue getQueue(String name); + QueueExecutor getQueue(String name); - Task schedule(Queue queue, List args); + Task schedule(Queue queue, Date scheduled, List args); - Task schedule(long parent, Queue queue, List args); + Task schedule(Queue queue, long parent, Date scheduled, List args); /** * Polls for a new state of the execution. diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java index ddfa150..fd4b38b 100644 --- a/src/main/java/io/trygvis/async/JdbcAsyncService.java +++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java @@ -1,7 +1,8 @@ package io.trygvis.async; import io.trygvis.queue.JdbcQueueService; -import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueService; import io.trygvis.queue.QueueSystem; import io.trygvis.queue.Task; import io.trygvis.queue.TaskDao; @@ -11,11 +12,8 @@ import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.SQLException; -import java.util.Date; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.concurrent.ScheduledThreadPoolExecutor; import static java.lang.System.currentTimeMillis; import static java.lang.Thread.sleep; @@ -23,7 +21,7 @@ import static java.lang.Thread.sleep; public class JdbcAsyncService { private final Logger log = LoggerFactory.getLogger(getClass()); - private final Map queues = new HashMap<>(); + private final Map queues = new HashMap<>(); private final QueueSystem queueSystem; private final JdbcQueueService queueService; @@ -33,49 +31,22 @@ public class JdbcAsyncService { this.queueService = queueSystem.createQueueService(); } - public synchronized void registerQueue(Queue queue, TaskEffect processor) { - final QueueThread queueThread = new QueueThread(queueSystem, processor, queue); - - queues.put(queue.name, queueThread); - - log.info("registerQueue: LEAVE"); - } - - public synchronized void startQueue(Queue queue, ScheduledThreadPoolExecutor executor) { - getQueueThread(queue.name).start(executor); - } - - public synchronized void stopQueue(Queue queue) { - QueueThread queueThread = queues.remove(queue.name); - - if (queueThread == null) { - throw new RuntimeException("No such queue: '" + queue.name + "'."); + public synchronized QueueController registerQueue(QueueExecutor queue, QueueService.TaskExecutionRequest req, TaskEffect processor) { + if (queues.containsKey(queue.queue.name)) { + throw new IllegalArgumentException("Queue already exist."); } - queueThread.stop(); - } - - public Queue getQueue(String name) { - QueueThread queueThread = getQueueThread(name); + QueueController queueController = new QueueController(queueSystem, req, processor, queue); - return queueThread.queue; - } + queues.put(queue.queue.name, queueController); - public Task schedule(Connection c, final Queue queue, List args) throws SQLException { - return scheduleInner(c, null, queue, args); - } + log.info("registerQueue: LEAVE"); - public Task schedule(Connection c, long parent, Queue queue, List args) throws SQLException { - return scheduleInner(c, parent, queue, args); + return queueController; } - private Task scheduleInner(Connection c, Long parent, final Queue queue, List args) throws SQLException { - Date scheduled = new Date(); - - Task task = queueService.schedule(c, queue, parent, scheduled, args); - log.info("Created task = {}", task); - - return task; + public QueueExecutor getQueue(String name) { + return getQueueThread(name).queue; } public Task await(Connection c, Task task, long timeout) throws SQLException { @@ -105,12 +76,12 @@ public class JdbcAsyncService { return taskDao.findById(ref.id()); } - private QueueThread getQueueThread(String name) { - QueueThread queueThread = queues.get(name); + private QueueController getQueueThread(String name) { + QueueController queueController = queues.get(name); - if (queueThread == null) { + if (queueController == null) { throw new RuntimeException("No such queue: '" + name + "'."); } - return queueThread; + return queueController; } } diff --git a/src/main/java/io/trygvis/async/QueueController.java b/src/main/java/io/trygvis/async/QueueController.java new file mode 100644 index 0000000..ea1c42d --- /dev/null +++ b/src/main/java/io/trygvis/async/QueueController.java @@ -0,0 +1,135 @@ +package io.trygvis.async; + +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueSystem; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.List; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static io.trygvis.queue.QueueService.TaskExecutionRequest; +import static io.trygvis.queue.Task.TaskState.NEW; + +public class QueueController { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final QueueSystem queueSystem; + + private final SqlEffectExecutor sqlEffectExecutor; + + private final TaskEffect taskEffect; + + private final TaskExecutionRequest req; + + public final QueueExecutor queue; + + private boolean shouldRun = true; + + private boolean checkForNewTasks; + + private boolean running; + + private Thread thread; + + private ScheduledThreadPoolExecutor executor; + + public QueueController(QueueSystem queueSystem, TaskExecutionRequest req, TaskEffect taskEffect, QueueExecutor queue) { + this.queueSystem = queueSystem; + this.req = req; + this.sqlEffectExecutor = queueSystem.sqlEffectExecutor; + this.taskEffect = taskEffect; + this.queue = queue; + } + + private class QueueThread implements Runnable { + public void run() { + while (shouldRun) { + List tasks = null; + + try { + tasks = sqlEffectExecutor.transaction(new SqlEffect>() { + public List doInConnection(Connection c) throws SQLException { + return queueSystem.createTaskDao(c).findByQueueAndState(queue.queue.name, NEW, req.chunkSize); + } + }); + + log.info("Found {} tasks on queue {}", tasks.size(), queue.queue.name); + + if (tasks.size() > 0) { + queue.executeTasks(req, taskEffect, tasks, executor); + } + } catch (Throwable e) { + if (shouldRun) { + log.warn("Error while executing tasks.", e); + } + } + + // If we found exactly the same number of tasks that we asked for, there is most likely more to go. + if (tasks != null && tasks.size() == req.chunkSize) { + continue; + } + + synchronized (this) { + if (checkForNewTasks) { + log.info("Ping received!"); + checkForNewTasks = false; + } else { + try { + wait(queue.queue.interval); + } catch (InterruptedException e) { + // ignore + } + } + } + } + + log.info("Thread for queue {} has stopped.", queue.queue.name); + running = false; + synchronized (this) { + this.notifyAll(); + } + } + } + + public synchronized void start(ScheduledThreadPoolExecutor executor) { + if (running) { + throw new IllegalStateException("Already running"); + } + + log.info("Starting thread for queue {} with poll interval = {}s", queue.queue.name, queue.queue.interval); + + running = true; + this.executor = executor; + + thread = new Thread(new QueueThread(), "queue: " + queue.queue.name); + thread.setDaemon(true); + thread.start(); + } + + public synchronized void stop() { + if (!running) { + return; + } + + log.info("Stopping thread for queue {}", queue.queue.name); + + shouldRun = false; + + thread.interrupt(); + while (running) { + try { + wait(1000); + } catch (InterruptedException e) { + // continue + } + thread.interrupt(); + } + thread = null; + executor.shutdownNow(); + } +} diff --git a/src/main/java/io/trygvis/async/QueueStats.java b/src/main/java/io/trygvis/async/QueueStats.java new file mode 100644 index 0000000..7c75149 --- /dev/null +++ b/src/main/java/io/trygvis/async/QueueStats.java @@ -0,0 +1,15 @@ +package io.trygvis.async; + +public class QueueStats { + public final int totalMessageCount; + public final int okMessageCount; + public final int failedMessageCount; + public final int scheduledMessageCount; + + public QueueStats(int totalMessageCount, int okMessageCount, int failedMessageCount, int scheduledMessageCount) { + this.totalMessageCount = totalMessageCount; + this.okMessageCount = okMessageCount; + this.failedMessageCount = failedMessageCount; + this.scheduledMessageCount = scheduledMessageCount; + } +} diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java deleted file mode 100644 index 61196b6..0000000 --- a/src/main/java/io/trygvis/async/QueueThread.java +++ /dev/null @@ -1,149 +0,0 @@ -package io.trygvis.async; - -import io.trygvis.queue.JdbcQueueService; -import io.trygvis.queue.Queue; -import io.trygvis.queue.QueueSystem; -import io.trygvis.queue.Task; -import io.trygvis.queue.TaskEffect; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.Connection; -import java.sql.SQLException; -import java.util.List; -import java.util.concurrent.ScheduledThreadPoolExecutor; - -import static io.trygvis.queue.QueueService.TaskExecutionRequest; -import static io.trygvis.queue.Task.TaskState.NEW; - -class QueueThread implements Runnable { - private final Logger log = LoggerFactory.getLogger(getClass()); - - private final QueueSystem queueSystem; - - private final JdbcQueueService queueService; - - private final SqlEffectExecutor sqlEffectExecutor; - - private final TaskEffect taskEffect; - - public final Queue queue; - - private boolean shouldRun = true; - - private boolean checkForNewTasks; - - private boolean available; - - private boolean running; - - private Thread thread; - - QueueThread(QueueSystem queueSystem, TaskEffect taskEffect, Queue queue) { - this.queueSystem = queueSystem; - this.sqlEffectExecutor = queueSystem.sqlEffectExecutor; - this.queueService = queueSystem.createQueueService(); - this.taskEffect = taskEffect; - this.queue = queue; - } - - public void ping() { - synchronized (this) { - System.out.println("QueueThread.ping: available=" + available + ", checkForNewTasks=" + checkForNewTasks); - if (available) { - log.info("Sending ping to " + queue); - notifyAll(); - } else { - checkForNewTasks = true; - } - } - } - - public void run() { - while (shouldRun) { - try { - TaskExecutionRequest req = new TaskExecutionRequest(100, true); - - List tasks = sqlEffectExecutor.transaction(new SqlEffect>() { - @Override - public List doInConnection(Connection c) throws SQLException { - return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, 100); - } - }); - - log.info("Found {} tasks on queue {}", tasks.size(), queue.name); - - if (tasks.size() > 0) { - queueService.executeTask(req, taskEffect, tasks); - } - - // If we found exactly the same number of tasks that we asked for, there is most likely more to go. - if (tasks.size() == req.chunkSize) { - continue; - } - } catch (Throwable e) { - if (shouldRun) { - log.warn("Error while executing tasks.", e); - } - } - - synchronized (this) { - available = true; - - if (checkForNewTasks) { - log.info("Ping received!"); - checkForNewTasks = false; - } else { - try { - wait(queue.interval); - } catch (InterruptedException e) { - // ignore - } - } - - available = false; - } - } - - log.info("Thread for queue {} has stopped.", queue.name); - running = false; - synchronized (this) { - this.notifyAll(); - } - } - - public synchronized void start(ScheduledThreadPoolExecutor executor) { - if (running) { - throw new IllegalStateException("Already running"); - } - - log.info("Starting thread for queue {} with poll interval = {}s", queue.name, queue.interval); - - running = true; - - thread = new Thread(this, queue.name); - thread.setDaemon(true); - thread.start(); - } - - public synchronized void stop() { - if (!running) { - return; - } - - log.info("Stopping thread for queue {}", queue.name); - - shouldRun = false; - - thread.interrupt(); - while (running) { - try { - wait(1000); - } catch (InterruptedException e) { - // continue - } - thread.interrupt(); - } - thread = null; - } -} -- cgit v1.2.3