aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/async
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/async')
-rwxr-xr-xsrc/main/java/io/trygvis/async/AsyncService.java12
-rw-r--r--src/main/java/io/trygvis/async/JdbcAsyncService.java61
-rw-r--r--src/main/java/io/trygvis/async/QueueController.java135
-rw-r--r--src/main/java/io/trygvis/async/QueueStats.java15
-rw-r--r--src/main/java/io/trygvis/async/QueueThread.java149
5 files changed, 174 insertions, 198 deletions
diff --git a/src/main/java/io/trygvis/async/AsyncService.java b/src/main/java/io/trygvis/async/AsyncService.java
index daf99e4..9332596 100755
--- a/src/main/java/io/trygvis/async/AsyncService.java
+++ b/src/main/java/io/trygvis/async/AsyncService.java
@@ -1,9 +1,13 @@
package io.trygvis.async;
import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueExecutor;
+import io.trygvis.queue.QueueService;
import io.trygvis.queue.Task;
import io.trygvis.queue.TaskEffect;
+import java.sql.SQLException;
+import java.util.Date;
import java.util.List;
/**
@@ -11,13 +15,13 @@ import java.util.List;
*/
public interface AsyncService {
- void registerQueue(Queue queue, TaskEffect processor);
+ QueueController registerQueue(Queue queue, QueueService.TaskExecutionRequest req, TaskEffect processor) throws SQLException;
- Queue getQueue(String name);
+ QueueExecutor getQueue(String name);
- Task schedule(Queue queue, List<String> args);
+ Task schedule(Queue queue, Date scheduled, List<String> args);
- Task schedule(long parent, Queue queue, List<String> args);
+ Task schedule(Queue queue, long parent, Date scheduled, List<String> args);
/**
* Polls for a new state of the execution.
diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java
index ddfa150..fd4b38b 100644
--- a/src/main/java/io/trygvis/async/JdbcAsyncService.java
+++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java
@@ -1,7 +1,8 @@
package io.trygvis.async;
import io.trygvis.queue.JdbcQueueService;
-import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueExecutor;
+import io.trygvis.queue.QueueService;
import io.trygvis.queue.QueueSystem;
import io.trygvis.queue.Task;
import io.trygvis.queue.TaskDao;
@@ -11,11 +12,8 @@ import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.SQLException;
-import java.util.Date;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.sleep;
@@ -23,7 +21,7 @@ import static java.lang.Thread.sleep;
public class JdbcAsyncService {
private final Logger log = LoggerFactory.getLogger(getClass());
- private final Map<String, QueueThread> queues = new HashMap<>();
+ private final Map<String, QueueController> queues = new HashMap<>();
private final QueueSystem queueSystem;
private final JdbcQueueService queueService;
@@ -33,49 +31,22 @@ public class JdbcAsyncService {
this.queueService = queueSystem.createQueueService();
}
- public synchronized void registerQueue(Queue queue, TaskEffect processor) {
- final QueueThread queueThread = new QueueThread(queueSystem, processor, queue);
-
- queues.put(queue.name, queueThread);
-
- log.info("registerQueue: LEAVE");
- }
-
- public synchronized void startQueue(Queue queue, ScheduledThreadPoolExecutor executor) {
- getQueueThread(queue.name).start(executor);
- }
-
- public synchronized void stopQueue(Queue queue) {
- QueueThread queueThread = queues.remove(queue.name);
-
- if (queueThread == null) {
- throw new RuntimeException("No such queue: '" + queue.name + "'.");
+ public synchronized QueueController registerQueue(QueueExecutor queue, QueueService.TaskExecutionRequest req, TaskEffect processor) {
+ if (queues.containsKey(queue.queue.name)) {
+ throw new IllegalArgumentException("Queue already exist.");
}
- queueThread.stop();
- }
-
- public Queue getQueue(String name) {
- QueueThread queueThread = getQueueThread(name);
+ QueueController queueController = new QueueController(queueSystem, req, processor, queue);
- return queueThread.queue;
- }
+ queues.put(queue.queue.name, queueController);
- public Task schedule(Connection c, final Queue queue, List<String> args) throws SQLException {
- return scheduleInner(c, null, queue, args);
- }
+ log.info("registerQueue: LEAVE");
- public Task schedule(Connection c, long parent, Queue queue, List<String> args) throws SQLException {
- return scheduleInner(c, parent, queue, args);
+ return queueController;
}
- private Task scheduleInner(Connection c, Long parent, final Queue queue, List<String> args) throws SQLException {
- Date scheduled = new Date();
-
- Task task = queueService.schedule(c, queue, parent, scheduled, args);
- log.info("Created task = {}", task);
-
- return task;
+ public QueueExecutor getQueue(String name) {
+ return getQueueThread(name).queue;
}
public Task await(Connection c, Task task, long timeout) throws SQLException {
@@ -105,12 +76,12 @@ public class JdbcAsyncService {
return taskDao.findById(ref.id());
}
- private QueueThread getQueueThread(String name) {
- QueueThread queueThread = queues.get(name);
+ private QueueController getQueueThread(String name) {
+ QueueController queueController = queues.get(name);
- if (queueThread == null) {
+ if (queueController == null) {
throw new RuntimeException("No such queue: '" + name + "'.");
}
- return queueThread;
+ return queueController;
}
}
diff --git a/src/main/java/io/trygvis/async/QueueController.java b/src/main/java/io/trygvis/async/QueueController.java
new file mode 100644
index 0000000..ea1c42d
--- /dev/null
+++ b/src/main/java/io/trygvis/async/QueueController.java
@@ -0,0 +1,135 @@
+package io.trygvis.async;
+
+import io.trygvis.queue.QueueExecutor;
+import io.trygvis.queue.QueueSystem;
+import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskEffect;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import static io.trygvis.queue.QueueService.TaskExecutionRequest;
+import static io.trygvis.queue.Task.TaskState.NEW;
+
+public class QueueController {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final QueueSystem queueSystem;
+
+ private final SqlEffectExecutor sqlEffectExecutor;
+
+ private final TaskEffect taskEffect;
+
+ private final TaskExecutionRequest req;
+
+ public final QueueExecutor queue;
+
+ private boolean shouldRun = true;
+
+ private boolean checkForNewTasks;
+
+ private boolean running;
+
+ private Thread thread;
+
+ private ScheduledThreadPoolExecutor executor;
+
+ public QueueController(QueueSystem queueSystem, TaskExecutionRequest req, TaskEffect taskEffect, QueueExecutor queue) {
+ this.queueSystem = queueSystem;
+ this.req = req;
+ this.sqlEffectExecutor = queueSystem.sqlEffectExecutor;
+ this.taskEffect = taskEffect;
+ this.queue = queue;
+ }
+
+ private class QueueThread implements Runnable {
+ public void run() {
+ while (shouldRun) {
+ List<Task> tasks = null;
+
+ try {
+ tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() {
+ public List<Task> doInConnection(Connection c) throws SQLException {
+ return queueSystem.createTaskDao(c).findByQueueAndState(queue.queue.name, NEW, req.chunkSize);
+ }
+ });
+
+ log.info("Found {} tasks on queue {}", tasks.size(), queue.queue.name);
+
+ if (tasks.size() > 0) {
+ queue.executeTasks(req, taskEffect, tasks, executor);
+ }
+ } catch (Throwable e) {
+ if (shouldRun) {
+ log.warn("Error while executing tasks.", e);
+ }
+ }
+
+ // If we found exactly the same number of tasks that we asked for, there is most likely more to go.
+ if (tasks != null && tasks.size() == req.chunkSize) {
+ continue;
+ }
+
+ synchronized (this) {
+ if (checkForNewTasks) {
+ log.info("Ping received!");
+ checkForNewTasks = false;
+ } else {
+ try {
+ wait(queue.queue.interval);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+ }
+
+ log.info("Thread for queue {} has stopped.", queue.queue.name);
+ running = false;
+ synchronized (this) {
+ this.notifyAll();
+ }
+ }
+ }
+
+ public synchronized void start(ScheduledThreadPoolExecutor executor) {
+ if (running) {
+ throw new IllegalStateException("Already running");
+ }
+
+ log.info("Starting thread for queue {} with poll interval = {}s", queue.queue.name, queue.queue.interval);
+
+ running = true;
+ this.executor = executor;
+
+ thread = new Thread(new QueueThread(), "queue: " + queue.queue.name);
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ public synchronized void stop() {
+ if (!running) {
+ return;
+ }
+
+ log.info("Stopping thread for queue {}", queue.queue.name);
+
+ shouldRun = false;
+
+ thread.interrupt();
+ while (running) {
+ try {
+ wait(1000);
+ } catch (InterruptedException e) {
+ // continue
+ }
+ thread.interrupt();
+ }
+ thread = null;
+ executor.shutdownNow();
+ }
+}
diff --git a/src/main/java/io/trygvis/async/QueueStats.java b/src/main/java/io/trygvis/async/QueueStats.java
new file mode 100644
index 0000000..7c75149
--- /dev/null
+++ b/src/main/java/io/trygvis/async/QueueStats.java
@@ -0,0 +1,15 @@
+package io.trygvis.async;
+
+public class QueueStats {
+ public final int totalMessageCount;
+ public final int okMessageCount;
+ public final int failedMessageCount;
+ public final int scheduledMessageCount;
+
+ public QueueStats(int totalMessageCount, int okMessageCount, int failedMessageCount, int scheduledMessageCount) {
+ this.totalMessageCount = totalMessageCount;
+ this.okMessageCount = okMessageCount;
+ this.failedMessageCount = failedMessageCount;
+ this.scheduledMessageCount = scheduledMessageCount;
+ }
+}
diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java
deleted file mode 100644
index 61196b6..0000000
--- a/src/main/java/io/trygvis/async/QueueThread.java
+++ /dev/null
@@ -1,149 +0,0 @@
-package io.trygvis.async;
-
-import io.trygvis.queue.JdbcQueueService;
-import io.trygvis.queue.Queue;
-import io.trygvis.queue.QueueSystem;
-import io.trygvis.queue.Task;
-import io.trygvis.queue.TaskEffect;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.util.List;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-
-import static io.trygvis.queue.QueueService.TaskExecutionRequest;
-import static io.trygvis.queue.Task.TaskState.NEW;
-
-class QueueThread implements Runnable {
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private final QueueSystem queueSystem;
-
- private final JdbcQueueService queueService;
-
- private final SqlEffectExecutor sqlEffectExecutor;
-
- private final TaskEffect taskEffect;
-
- public final Queue queue;
-
- private boolean shouldRun = true;
-
- private boolean checkForNewTasks;
-
- private boolean available;
-
- private boolean running;
-
- private Thread thread;
-
- QueueThread(QueueSystem queueSystem, TaskEffect taskEffect, Queue queue) {
- this.queueSystem = queueSystem;
- this.sqlEffectExecutor = queueSystem.sqlEffectExecutor;
- this.queueService = queueSystem.createQueueService();
- this.taskEffect = taskEffect;
- this.queue = queue;
- }
-
- public void ping() {
- synchronized (this) {
- System.out.println("QueueThread.ping: available=" + available + ", checkForNewTasks=" + checkForNewTasks);
- if (available) {
- log.info("Sending ping to " + queue);
- notifyAll();
- } else {
- checkForNewTasks = true;
- }
- }
- }
-
- public void run() {
- while (shouldRun) {
- try {
- TaskExecutionRequest req = new TaskExecutionRequest(100, true);
-
- List<Task> tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() {
- @Override
- public List<Task> doInConnection(Connection c) throws SQLException {
- return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, 100);
- }
- });
-
- log.info("Found {} tasks on queue {}", tasks.size(), queue.name);
-
- if (tasks.size() > 0) {
- queueService.executeTask(req, taskEffect, tasks);
- }
-
- // If we found exactly the same number of tasks that we asked for, there is most likely more to go.
- if (tasks.size() == req.chunkSize) {
- continue;
- }
- } catch (Throwable e) {
- if (shouldRun) {
- log.warn("Error while executing tasks.", e);
- }
- }
-
- synchronized (this) {
- available = true;
-
- if (checkForNewTasks) {
- log.info("Ping received!");
- checkForNewTasks = false;
- } else {
- try {
- wait(queue.interval);
- } catch (InterruptedException e) {
- // ignore
- }
- }
-
- available = false;
- }
- }
-
- log.info("Thread for queue {} has stopped.", queue.name);
- running = false;
- synchronized (this) {
- this.notifyAll();
- }
- }
-
- public synchronized void start(ScheduledThreadPoolExecutor executor) {
- if (running) {
- throw new IllegalStateException("Already running");
- }
-
- log.info("Starting thread for queue {} with poll interval = {}s", queue.name, queue.interval);
-
- running = true;
-
- thread = new Thread(this, queue.name);
- thread.setDaemon(true);
- thread.start();
- }
-
- public synchronized void stop() {
- if (!running) {
- return;
- }
-
- log.info("Stopping thread for queue {}", queue.name);
-
- shouldRun = false;
-
- thread.interrupt();
- while (running) {
- try {
- wait(1000);
- } catch (InterruptedException e) {
- // continue
- }
- thread.interrupt();
- }
- thread = null;
- }
-}