aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/main/java/io/trygvis/async/JdbcAsyncService.java6
-rw-r--r--src/main/java/io/trygvis/async/QueueController.java46
-rw-r--r--src/main/java/io/trygvis/async/QueueStats.java10
-rw-r--r--src/main/java/io/trygvis/queue/QueueExecutor.java27
-rw-r--r--src/main/java/io/trygvis/queue/QueueService.java4
-rw-r--r--src/main/java/io/trygvis/queue/TaskDao.java17
-rw-r--r--src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java32
7 files changed, 113 insertions, 29 deletions
diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java
index fd4b38b..57ab5c6 100644
--- a/src/main/java/io/trygvis/async/JdbcAsyncService.java
+++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java
@@ -71,12 +71,10 @@ public class JdbcAsyncService {
}
public Task update(Connection c, Task ref) throws SQLException {
- TaskDao taskDao = queueSystem.createTaskDao(c);
-
- return taskDao.findById(ref.id());
+ return queueSystem.createTaskDao(c).findById(ref.id());
}
- private QueueController getQueueThread(String name) {
+ private synchronized QueueController getQueueThread(String name) {
QueueController queueController = queues.get(name);
if (queueController == null) {
diff --git a/src/main/java/io/trygvis/async/QueueController.java b/src/main/java/io/trygvis/async/QueueController.java
index ea1c42d..863d6a5 100644
--- a/src/main/java/io/trygvis/async/QueueController.java
+++ b/src/main/java/io/trygvis/async/QueueController.java
@@ -1,6 +1,7 @@
package io.trygvis.async;
import io.trygvis.queue.QueueExecutor;
+import io.trygvis.queue.QueueService;
import io.trygvis.queue.QueueSystem;
import io.trygvis.queue.Task;
import io.trygvis.queue.TaskEffect;
@@ -9,9 +10,15 @@ import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import static io.trygvis.queue.QueueExecutor.TaskExecutionResult;
import static io.trygvis.queue.QueueService.TaskExecutionRequest;
import static io.trygvis.queue.Task.TaskState.NEW;
@@ -46,6 +53,38 @@ public class QueueController {
this.queue = queue;
}
+ public void executeTasks(final QueueService.TaskExecutionRequest req, final TaskEffect effect, final List<Task> tasks,
+ ScheduledThreadPoolExecutor executor) {
+ ExecutorCompletionService<TaskExecutionResult> completionService = new ExecutorCompletionService<>(executor);
+
+ List<Future<TaskExecutionResult>> results = new ArrayList<>(tasks.size());
+
+ for (final Task task : tasks) {
+ results.add(completionService.submit(new Callable<TaskExecutionResult>() {
+ @Override
+ public TaskExecutionResult call() throws Exception {
+ return queue.applyTask(effect, task);
+ }
+ }));
+ }
+
+ for (Future<TaskExecutionResult> result : results) {
+ while(!result.isDone()) {
+ try {
+ result.get();
+ break;
+ } catch (InterruptedException e) {
+ if(shouldRun) {
+ continue;
+ }
+ } catch (ExecutionException e) {
+ log.error("Unexpected exception.", e);
+ break;
+ }
+ }
+ }
+ }
+
private class QueueThread implements Runnable {
public void run() {
while (shouldRun) {
@@ -61,7 +100,7 @@ public class QueueController {
log.info("Found {} tasks on queue {}", tasks.size(), queue.queue.name);
if (tasks.size() > 0) {
- queue.executeTasks(req, taskEffect, tasks, executor);
+ executeTasks(req, taskEffect, tasks, executor);
}
} catch (Throwable e) {
if (shouldRun) {
@@ -71,6 +110,7 @@ public class QueueController {
// If we found exactly the same number of tasks that we asked for, there is most likely more to go.
if (tasks != null && tasks.size() == req.chunkSize) {
+ log.info("Got a full chunk, continuing directly.");
continue;
}
@@ -101,7 +141,7 @@ public class QueueController {
throw new IllegalStateException("Already running");
}
- log.info("Starting thread for queue {} with poll interval = {}s", queue.queue.name, queue.queue.interval);
+ log.info("Starting thread for queue {} with poll interval = {}ms", queue.queue.name, queue.queue.interval);
running = true;
this.executor = executor;
@@ -120,6 +160,8 @@ public class QueueController {
shouldRun = false;
+ // TODO: empty out the currently executing tasks.
+
thread.interrupt();
while (running) {
try {
diff --git a/src/main/java/io/trygvis/async/QueueStats.java b/src/main/java/io/trygvis/async/QueueStats.java
index 7c75149..3694c06 100644
--- a/src/main/java/io/trygvis/async/QueueStats.java
+++ b/src/main/java/io/trygvis/async/QueueStats.java
@@ -12,4 +12,14 @@ public class QueueStats {
this.failedMessageCount = failedMessageCount;
this.scheduledMessageCount = scheduledMessageCount;
}
+
+ @Override
+ public String toString() {
+ return "QueueStats{" +
+ "totalMessageCount=" + totalMessageCount +
+ ", okMessageCount=" + okMessageCount +
+ ", failedMessageCount=" + failedMessageCount +
+ ", scheduledMessageCount=" + scheduledMessageCount +
+ '}';
+ }
}
diff --git a/src/main/java/io/trygvis/queue/QueueExecutor.java b/src/main/java/io/trygvis/queue/QueueExecutor.java
index a1eb3b7..3739532 100644
--- a/src/main/java/io/trygvis/queue/QueueExecutor.java
+++ b/src/main/java/io/trygvis/queue/QueueExecutor.java
@@ -10,11 +10,8 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.util.Date;
import java.util.List;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.FAILED;
-import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.MISSED;
-import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.OK;
+import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.*;
import static io.trygvis.queue.Task.TaskState.NEW;
public class QueueExecutor {
@@ -51,6 +48,10 @@ public class QueueExecutor {
}
}
+ public QueueStats getStats() {
+ return stats.toStats();
+ }
+
public void consumeAll(final QueueService.TaskExecutionRequest req, final TaskEffect effect) throws SQLException {
log.info("Consuming tasks: request={}", req);
@@ -69,17 +70,7 @@ public class QueueExecutor {
} while (tasks.size() > 0);
}
- public void executeTasks(final QueueService.TaskExecutionRequest req, final TaskEffect taskEffect, final List<Task> tasks,
- ScheduledThreadPoolExecutor executor) {
- executor.execute(new Runnable() {
- @Override
- public void run() {
- applyTasks(req, taskEffect, tasks);
- }
- });
- }
-
- private void applyTasks(QueueService.TaskExecutionRequest req, TaskEffect effect, List<Task> tasks) {
+ public void applyTasks(QueueService.TaskExecutionRequest req, TaskEffect effect, List<Task> tasks) {
for (Task task : tasks) {
TaskExecutionResult result = applyTask(effect, task);
@@ -94,17 +85,17 @@ public class QueueExecutor {
* <p/>
* If the task fails, the status is set to error in a separate transaction.
*/
- private TaskExecutionResult applyTask(TaskEffect effect, final Task task) {
+ public TaskExecutionResult applyTask(TaskEffect effect, final Task task) {
try {
Integer count = sqlEffectExecutor.transaction(new SqlEffect<Integer>() {
@Override
public Integer doInConnection(Connection c) throws SQLException {
- return queueSystem.createTaskDao(c).update(task.markProcessing());
+ return queueSystem.createTaskDao(c).update(task.markProcessing(), NEW);
}
});
if (count == 0) {
- log.trace("Missed task {}", task.id());
+ log.warn("Missed task {}", task.id());
return MISSED;
}
diff --git a/src/main/java/io/trygvis/queue/QueueService.java b/src/main/java/io/trygvis/queue/QueueService.java
index eee14ed..f4ce536 100644
--- a/src/main/java/io/trygvis/queue/QueueService.java
+++ b/src/main/java/io/trygvis/queue/QueueService.java
@@ -15,6 +15,10 @@ public interface QueueService {
// TODO: saveExceptions
public TaskExecutionRequest(long chunkSize, boolean stopOnError) {
+ if (chunkSize <= 0) {
+ throw new IllegalArgumentException("chunkSize has to be bigger than zero.");
+ }
+
this.chunkSize = chunkSize;
this.stopOnError = stopOnError;
}
diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java
index 5d77a41..365b44b 100644
--- a/src/main/java/io/trygvis/queue/TaskDao.java
+++ b/src/main/java/io/trygvis/queue/TaskDao.java
@@ -84,14 +84,27 @@ public class TaskDao {
}
public int update(Task task) throws SQLException {
- try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=?, scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?")) {
+ return update(task, null);
+ }
+
+ public int update(Task task, TaskState state) throws SQLException {
+ String sql = "UPDATE task SET state=?, scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?";
+
+ if (state != null) {
+ sql += " AND state=?";
+ }
+
+ try (PreparedStatement stmt = c.prepareStatement(sql)) {
int i = 1;
stmt.setString(i++, task.state.name());
stmt.setTimestamp(i++, new Timestamp(task.scheduled.getTime()));
setTimestamp(stmt, i++, task.lastRun);
stmt.setInt(i++, task.runCount);
setTimestamp(stmt, i++, task.completed);
- stmt.setLong(i, task.id());
+ stmt.setLong(i++, task.id());
+ if (state != null) {
+ stmt.setString(i, state.name());
+ }
return stmt.executeUpdate();
}
}
diff --git a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java
index dd478d7..361c2c5 100644
--- a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java
+++ b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java
@@ -2,6 +2,7 @@ package io.trygvis.test.jdbc;
import io.trygvis.async.JdbcAsyncService;
import io.trygvis.async.QueueController;
+import io.trygvis.async.QueueStats;
import io.trygvis.async.SqlEffect;
import io.trygvis.async.SqlEffectExecutor;
import io.trygvis.queue.JdbcQueueService;
@@ -16,10 +17,13 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.util.Date;
import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import static io.trygvis.queue.Task.newTask;
import static io.trygvis.test.DbUtil.createDataSource;
+import static java.lang.System.currentTimeMillis;
import static java.util.Collections.singletonList;
public class AsyncConsumerExample {
@@ -27,7 +31,7 @@ public class AsyncConsumerExample {
private static String inputName = "my-input";
private static String outputName = "my-output";
- private static int interval = 10;
+ private static int interval = 1000;
private static final TaskEffect adder = new TaskEffect() {
public List<Task> apply(Task task) throws Exception {
@@ -68,8 +72,30 @@ public class AsyncConsumerExample {
QueueController controller = asyncService.registerQueue(input, req, adder);
- controller.start(new ScheduledThreadPoolExecutor(2));
- Thread.sleep(60 * 1000);
+ Timer timer = new Timer();
+ timer.scheduleAtFixedRate(new TimerTask() {
+ @Override
+ public void run() {
+ System.out.println(input.getStats());
+ System.out.println(output.getStats());
+ }
+ }, 1000, 1000);
+
+ long start = currentTimeMillis();
+ controller.start(new ScheduledThreadPoolExecutor(1));
+ Thread.sleep(5 * 1000);
controller.stop();
+ long end = currentTimeMillis();
+ timer.cancel();
+
+ QueueStats stats = input.getStats();
+
+ System.out.println("Summary:");
+ System.out.println(stats.toString());
+ System.out.println(output.getStats().toString());
+
+ long duration = end - start;
+ double rate = 1000 * ((double) stats.totalMessageCount) / duration;
+ System.out.println("Consumed " + stats.totalMessageCount + " messages in " + duration + "ms at " + rate + " msg/s");
}
}