aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/async
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/async')
-rw-r--r--src/main/java/io/trygvis/async/JdbcAsyncService.java10
-rw-r--r--src/main/java/io/trygvis/async/QueueController.java70
-rw-r--r--src/main/java/io/trygvis/async/QueueStats.java5
3 files changed, 50 insertions, 35 deletions
diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java
index 57ab5c6..46f1f30 100644
--- a/src/main/java/io/trygvis/async/JdbcAsyncService.java
+++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java
@@ -1,14 +1,10 @@
package io.trygvis.async;
-import io.trygvis.queue.JdbcQueueService;
import io.trygvis.queue.QueueExecutor;
import io.trygvis.queue.QueueService;
import io.trygvis.queue.QueueSystem;
import io.trygvis.queue.Task;
-import io.trygvis.queue.TaskDao;
import io.trygvis.queue.TaskEffect;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.SQLException;
@@ -19,16 +15,12 @@ import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.sleep;
public class JdbcAsyncService {
- private final Logger log = LoggerFactory.getLogger(getClass());
-
private final Map<String, QueueController> queues = new HashMap<>();
private final QueueSystem queueSystem;
- private final JdbcQueueService queueService;
public JdbcAsyncService(QueueSystem queueSystem) {
this.queueSystem = queueSystem;
- this.queueService = queueSystem.createQueueService();
}
public synchronized QueueController registerQueue(QueueExecutor queue, QueueService.TaskExecutionRequest req, TaskEffect processor) {
@@ -40,8 +32,6 @@ public class JdbcAsyncService {
queues.put(queue.queue.name, queueController);
- log.info("registerQueue: LEAVE");
-
return queueController;
}
diff --git a/src/main/java/io/trygvis/async/QueueController.java b/src/main/java/io/trygvis/async/QueueController.java
index 8aed91f..a343d42 100644
--- a/src/main/java/io/trygvis/async/QueueController.java
+++ b/src/main/java/io/trygvis/async/QueueController.java
@@ -1,7 +1,6 @@
package io.trygvis.async;
import io.trygvis.queue.QueueExecutor;
-import io.trygvis.queue.QueueService;
import io.trygvis.queue.QueueSystem;
import io.trygvis.queue.SqlEffect;
import io.trygvis.queue.SqlEffectExecutor;
@@ -13,11 +12,10 @@ import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import static io.trygvis.queue.QueueExecutor.TaskExecutionResult;
@@ -47,6 +45,8 @@ public class QueueController {
private ScheduledThreadPoolExecutor executor;
+ private List<Runnable> stopListeners = new ArrayList<>();
+
public QueueController(QueueSystem queueSystem, TaskExecutionRequest req, TaskEffect taskEffect, QueueExecutor queue) {
this.queueSystem = queueSystem;
this.req = req;
@@ -55,36 +55,50 @@ public class QueueController {
this.queue = queue;
}
- public void executeTasks(final QueueService.TaskExecutionRequest req, final TaskEffect effect, final List<Task> tasks,
- ScheduledThreadPoolExecutor executor) {
- ExecutorCompletionService<TaskExecutionResult> completionService = new ExecutorCompletionService<>(executor);
+ public void scheduleAll(final List<Task> tasks) throws InterruptedException {
+ ExecutorCompletionService<TaskExecutionResult> service = new ExecutorCompletionService<>(executor);
- List<Future<TaskExecutionResult>> results = new ArrayList<>(tasks.size());
+ for (final Task task : tasks) {
+ service.submit(new Callable<TaskExecutionResult>() {
+ @Override
+ public TaskExecutionResult call() throws Exception {
+ return queue.applyTask(taskEffect, task);
+ }
+ });
+ }
+ }
+ public void invokeAll(final List<Task> tasks) throws InterruptedException {
+ List<Callable<TaskExecutionResult>> callables = new ArrayList<>(tasks.size());
for (final Task task : tasks) {
- results.add(completionService.submit(new Callable<TaskExecutionResult>() {
+ callables.add(new Callable<TaskExecutionResult>() {
@Override
public TaskExecutionResult call() throws Exception {
- return queue.applyTask(effect, task);
+ return queue.applyTask(taskEffect, task);
}
- }));
+ });
}
- for (Future<TaskExecutionResult> result : results) {
- while(!result.isDone()) {
+ executor.invokeAll(callables);
+ }
+
+ public void hint(final long id) throws SQLException {
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ Task task = queueSystem.createTaskDao(c).findById(id);
+
try {
- result.get();
- break;
+ scheduleAll(Collections.singletonList(task));
} catch (InterruptedException e) {
- if(shouldRun) {
- continue;
- }
- } catch (ExecutionException e) {
- log.error("Unexpected exception.", e);
- break;
+ throw new SQLException(e);
}
}
- }
+ });
+ }
+
+ public synchronized void addOnStopListener(Runnable runnable) {
+ stopListeners.add(runnable);
}
private class QueueThread implements Runnable {
@@ -102,7 +116,7 @@ public class QueueController {
log.info("Found {} tasks on queue {}", tasks.size(), queue.queue.name);
if (tasks.size() > 0) {
- executeTasks(req, taskEffect, tasks, executor);
+ invokeAll(tasks);
}
} catch (Throwable e) {
if (shouldRun) {
@@ -111,7 +125,7 @@ public class QueueController {
}
// If we found exactly the same number of tasks that we asked for, there is most likely more to go.
- if (tasks != null && tasks.size() == req.chunkSize) {
+ if (req.continueOnFullChunk && tasks != null && tasks.size() == req.chunkSize) {
log.info("Got a full chunk, continuing directly.");
continue;
}
@@ -122,7 +136,7 @@ public class QueueController {
checkForNewTasks = false;
} else {
try {
- wait(queue.queue.interval);
+ wait(req.interval(queue.queue));
} catch (InterruptedException e) {
// ignore
}
@@ -160,6 +174,14 @@ public class QueueController {
log.info("Stopping thread for queue {}", queue.queue.name);
+ for (Runnable runnable : stopListeners) {
+ try {
+ runnable.run();
+ } catch (Throwable e) {
+ log.error("Error while running stop listener " + runnable, e);
+ }
+ }
+
shouldRun = false;
// TODO: empty out the currently executing tasks.
diff --git a/src/main/java/io/trygvis/async/QueueStats.java b/src/main/java/io/trygvis/async/QueueStats.java
index 3694c06..8edc720 100644
--- a/src/main/java/io/trygvis/async/QueueStats.java
+++ b/src/main/java/io/trygvis/async/QueueStats.java
@@ -4,12 +4,14 @@ public class QueueStats {
public final int totalMessageCount;
public final int okMessageCount;
public final int failedMessageCount;
+ public final int missedMessageCount;
public final int scheduledMessageCount;
- public QueueStats(int totalMessageCount, int okMessageCount, int failedMessageCount, int scheduledMessageCount) {
+ public QueueStats(int totalMessageCount, int okMessageCount, int failedMessageCount, int missedMessageCount, int scheduledMessageCount) {
this.totalMessageCount = totalMessageCount;
this.okMessageCount = okMessageCount;
this.failedMessageCount = failedMessageCount;
+ this.missedMessageCount = missedMessageCount;
this.scheduledMessageCount = scheduledMessageCount;
}
@@ -19,6 +21,7 @@ public class QueueStats {
"totalMessageCount=" + totalMessageCount +
", okMessageCount=" + okMessageCount +
", failedMessageCount=" + failedMessageCount +
+ ", missedMessageCount=" + missedMessageCount +
", scheduledMessageCount=" + scheduledMessageCount +
'}';
}