diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2013-06-23 09:37:57 +0200 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2013-06-23 09:37:57 +0200 |
commit | 7caa5b1f1e08f99cfe4465f091f47e2966d78aa7 (patch) | |
tree | c0bd7202845697207b04d518f613588df17d9e12 /src/main/java | |
download | jdbc-queue-master.tar.gz jdbc-queue-master.tar.bz2 jdbc-queue-master.tar.xz jdbc-queue-master.zip |
Diffstat (limited to 'src/main/java')
21 files changed, 1453 insertions, 0 deletions
diff --git a/src/main/java/io/trygvis/activemq/ActiveMqHinter.java b/src/main/java/io/trygvis/activemq/ActiveMqHinter.java new file mode 100644 index 0000000..f2cfb6e --- /dev/null +++ b/src/main/java/io/trygvis/activemq/ActiveMqHinter.java @@ -0,0 +1,152 @@ +package io.trygvis.activemq; + +import io.trygvis.async.QueueController; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Queue; +import javax.jms.QueueConnection; +import javax.jms.QueueSession; +import javax.jms.TextMessage; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.io.StringReader; +import java.nio.charset.Charset; +import java.sql.SQLException; + +import static java.lang.Long.parseLong; +import static java.lang.System.arraycopy; +import static java.nio.charset.Charset.forName; +import static javax.jms.Session.AUTO_ACKNOWLEDGE; + +public class ActiveMqHinter implements AutoCloseable { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final QueueConnection c; + + private static final Charset utf8 = forName("utf-8"); + + public ActiveMqHinter(ActiveMQConnectionFactory connectionFactory) throws JMSException { + log.info("Connecting to ActiveMQ Broker at {}", connectionFactory.getBrokerURL()); + c = connectionFactory.createQueueConnection(); + c.start(); + log.info("Connected, clientId = {}", c.getClientID()); + } + + public void createHinter(final QueueController controller) throws JMSException { + QueueSession session = c.createQueueSession(false, AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(controller.queue.queue.name); + final MessageConsumer consumer = session.createConsumer(queue); + + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + if ((message instanceof TextMessage)) { + String body; + try { + TextMessage textMessage = (TextMessage) message; + body = textMessage.getText(); + } catch (JMSException e) { + log.warn("Exception while reading body.", e); + throw new RuntimeException("Exception while reading body.", e); + } + + consumeString(new StringReader(body), controller); + } else if (message instanceof BytesMessage) { + final BytesMessage bytesMessage = (BytesMessage) message; + consumeString(new InputStreamReader(new ByteMessageInputStream(bytesMessage), utf8), controller); + } else { + throw new RuntimeException("Unsupported message type: " + message.getClass()); + } + } + }); + + controller.addOnStopListener(new Runnable() { + @Override + public void run() { + try { + consumer.close(); + } catch (JMSException e) { + log.error("Error while closing JMS consumer", e); + } + } + }); + } + + private void consumeString(Reader reader, QueueController controller) { + try { + BufferedReader r = new BufferedReader(reader); + + String line = r.readLine(); + + while (line != null) { + for (String id : line.split(",")) { + controller.hint(parseLong(id.trim())); + } + line = r.readLine(); + } + } catch (IOException | SQLException e) { + log.warn("Could not consume body.", e); + throw new RuntimeException("Could not consume body.", e); + } catch (NumberFormatException e) { + log.warn("Could not consume body.", e); + throw e; + } + } + + public void close() throws JMSException { + c.close(); + } + + private static class ByteMessageInputStream extends InputStream { + private final BytesMessage bytesMessage; + + public ByteMessageInputStream(BytesMessage bytesMessage) { + this.bytesMessage = bytesMessage; + } + + @Override + public int read(byte[] b) throws IOException { + try { + return bytesMessage.readBytes(b); + } catch (JMSException e) { + throw new IOException(e); + } + } + + @Override + public int read(byte[] out, int off, int len) throws IOException { + byte[] b = new byte[len]; + try { + int read = bytesMessage.readBytes(b); + if (read == -1) { + return -1; + } + arraycopy(b, 0, out, off, read); + return read; + } catch (JMSException e) { + throw new IOException(e); + } + } + + @Override + public int read() throws IOException { + try { + return bytesMessage.readByte(); + } catch (javax.jms.MessageEOFException e) { + return -1; + } catch (JMSException e) { + throw new IOException(e); + } + } + } +} diff --git a/src/main/java/io/trygvis/async/AsyncService.java b/src/main/java/io/trygvis/async/AsyncService.java new file mode 100755 index 0000000..9332596 --- /dev/null +++ b/src/main/java/io/trygvis/async/AsyncService.java @@ -0,0 +1,30 @@ +package io.trygvis.async; + +import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueService; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; + +import java.sql.SQLException; +import java.util.Date; +import java.util.List; + +/** + * A simple framework for running tasks. + */ +public interface AsyncService { + + QueueController registerQueue(Queue queue, QueueService.TaskExecutionRequest req, TaskEffect processor) throws SQLException; + + QueueExecutor getQueue(String name); + + Task schedule(Queue queue, Date scheduled, List<String> args); + + Task schedule(Queue queue, long parent, Date scheduled, List<String> args); + + /** + * Polls for a new state of the execution. + */ + Task update(Task ref); +} diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java new file mode 100644 index 0000000..46f1f30 --- /dev/null +++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java @@ -0,0 +1,75 @@ +package io.trygvis.async; + +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueService; +import io.trygvis.queue.QueueSystem; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +import static java.lang.System.currentTimeMillis; +import static java.lang.Thread.sleep; + +public class JdbcAsyncService { + private final Map<String, QueueController> queues = new HashMap<>(); + + private final QueueSystem queueSystem; + + public JdbcAsyncService(QueueSystem queueSystem) { + this.queueSystem = queueSystem; + } + + public synchronized QueueController registerQueue(QueueExecutor queue, QueueService.TaskExecutionRequest req, TaskEffect processor) { + if (queues.containsKey(queue.queue.name)) { + throw new IllegalArgumentException("Queue already exist."); + } + + QueueController queueController = new QueueController(queueSystem, req, processor, queue); + + queues.put(queue.queue.name, queueController); + + return queueController; + } + + public QueueExecutor getQueue(String name) { + return getQueueThread(name).queue; + } + + public Task await(Connection c, Task task, long timeout) throws SQLException { + final long start = currentTimeMillis(); + final long end = start + timeout; + + while (currentTimeMillis() < end) { + task = update(c, task); + + if (task == null) { + throw new RuntimeException("The task went away."); + } + + try { + sleep(100); + } catch (InterruptedException e) { + // break + } + } + + return task; + } + + public Task update(Connection c, Task ref) throws SQLException { + return queueSystem.createTaskDao(c).findById(ref.id()); + } + + private synchronized QueueController getQueueThread(String name) { + QueueController queueController = queues.get(name); + + if (queueController == null) { + throw new RuntimeException("No such queue: '" + name + "'."); + } + return queueController; + } +} diff --git a/src/main/java/io/trygvis/async/QueueController.java b/src/main/java/io/trygvis/async/QueueController.java new file mode 100644 index 0000000..a343d42 --- /dev/null +++ b/src/main/java/io/trygvis/async/QueueController.java @@ -0,0 +1,201 @@ +package io.trygvis.async; + +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueSystem; +import io.trygvis.queue.SqlEffect; +import io.trygvis.queue.SqlEffectExecutor; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static io.trygvis.queue.QueueExecutor.TaskExecutionResult; +import static io.trygvis.queue.QueueService.TaskExecutionRequest; +import static io.trygvis.queue.Task.TaskState.NEW; + +public class QueueController { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final QueueSystem queueSystem; + + private final SqlEffectExecutor sqlEffectExecutor; + + private final TaskEffect taskEffect; + + private final TaskExecutionRequest req; + + public final QueueExecutor queue; + + private boolean shouldRun = true; + + private boolean checkForNewTasks; + + private boolean running; + + private Thread thread; + + private ScheduledThreadPoolExecutor executor; + + private List<Runnable> stopListeners = new ArrayList<>(); + + public QueueController(QueueSystem queueSystem, TaskExecutionRequest req, TaskEffect taskEffect, QueueExecutor queue) { + this.queueSystem = queueSystem; + this.req = req; + this.sqlEffectExecutor = queueSystem.sqlEffectExecutor; + this.taskEffect = taskEffect; + this.queue = queue; + } + + public void scheduleAll(final List<Task> tasks) throws InterruptedException { + ExecutorCompletionService<TaskExecutionResult> service = new ExecutorCompletionService<>(executor); + + for (final Task task : tasks) { + service.submit(new Callable<TaskExecutionResult>() { + @Override + public TaskExecutionResult call() throws Exception { + return queue.applyTask(taskEffect, task); + } + }); + } + } + + public void invokeAll(final List<Task> tasks) throws InterruptedException { + List<Callable<TaskExecutionResult>> callables = new ArrayList<>(tasks.size()); + for (final Task task : tasks) { + callables.add(new Callable<TaskExecutionResult>() { + @Override + public TaskExecutionResult call() throws Exception { + return queue.applyTask(taskEffect, task); + } + }); + } + + executor.invokeAll(callables); + } + + public void hint(final long id) throws SQLException { + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + Task task = queueSystem.createTaskDao(c).findById(id); + + try { + scheduleAll(Collections.singletonList(task)); + } catch (InterruptedException e) { + throw new SQLException(e); + } + } + }); + } + + public synchronized void addOnStopListener(Runnable runnable) { + stopListeners.add(runnable); + } + + private class QueueThread implements Runnable { + public void run() { + while (shouldRun) { + List<Task> tasks = null; + + try { + tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() { + public List<Task> doInConnection(Connection c) throws SQLException { + return queueSystem.createTaskDao(c).findByQueueAndState(queue.queue.name, NEW, req.chunkSize); + } + }); + + log.info("Found {} tasks on queue {}", tasks.size(), queue.queue.name); + + if (tasks.size() > 0) { + invokeAll(tasks); + } + } catch (Throwable e) { + if (shouldRun) { + log.warn("Error while executing tasks.", e); + } + } + + // If we found exactly the same number of tasks that we asked for, there is most likely more to go. + if (req.continueOnFullChunk && tasks != null && tasks.size() == req.chunkSize) { + log.info("Got a full chunk, continuing directly."); + continue; + } + + synchronized (this) { + if (checkForNewTasks) { + log.info("Ping received!"); + checkForNewTasks = false; + } else { + try { + wait(req.interval(queue.queue)); + } catch (InterruptedException e) { + // ignore + } + } + } + } + + log.info("Thread for queue {} has stopped.", queue.queue.name); + running = false; + synchronized (this) { + this.notifyAll(); + } + } + } + + public synchronized void start(ScheduledThreadPoolExecutor executor) { + if (running) { + throw new IllegalStateException("Already running"); + } + + log.info("Starting thread for queue {} with poll interval = {}ms", queue.queue.name, queue.queue.interval); + + running = true; + this.executor = executor; + + thread = new Thread(new QueueThread(), "queue: " + queue.queue.name); + thread.setDaemon(true); + thread.start(); + } + + public synchronized void stop() { + if (!running) { + return; + } + + log.info("Stopping thread for queue {}", queue.queue.name); + + for (Runnable runnable : stopListeners) { + try { + runnable.run(); + } catch (Throwable e) { + log.error("Error while running stop listener " + runnable, e); + } + } + + shouldRun = false; + + // TODO: empty out the currently executing tasks. + + thread.interrupt(); + while (running) { + try { + wait(1000); + } catch (InterruptedException e) { + // continue + } + thread.interrupt(); + } + thread = null; + executor.shutdownNow(); + } +} diff --git a/src/main/java/io/trygvis/async/QueueStats.java b/src/main/java/io/trygvis/async/QueueStats.java new file mode 100644 index 0000000..8edc720 --- /dev/null +++ b/src/main/java/io/trygvis/async/QueueStats.java @@ -0,0 +1,28 @@ +package io.trygvis.async; + +public class QueueStats { + public final int totalMessageCount; + public final int okMessageCount; + public final int failedMessageCount; + public final int missedMessageCount; + public final int scheduledMessageCount; + + public QueueStats(int totalMessageCount, int okMessageCount, int failedMessageCount, int missedMessageCount, int scheduledMessageCount) { + this.totalMessageCount = totalMessageCount; + this.okMessageCount = okMessageCount; + this.failedMessageCount = failedMessageCount; + this.missedMessageCount = missedMessageCount; + this.scheduledMessageCount = scheduledMessageCount; + } + + @Override + public String toString() { + return "QueueStats{" + + "totalMessageCount=" + totalMessageCount + + ", okMessageCount=" + okMessageCount + + ", failedMessageCount=" + failedMessageCount + + ", missedMessageCount=" + missedMessageCount + + ", scheduledMessageCount=" + scheduledMessageCount + + '}'; + } +} diff --git a/src/main/java/io/trygvis/async/TaskFailureException.java b/src/main/java/io/trygvis/async/TaskFailureException.java new file mode 100644 index 0000000..7278e17 --- /dev/null +++ b/src/main/java/io/trygvis/async/TaskFailureException.java @@ -0,0 +1,7 @@ +package io.trygvis.async; + +class TaskFailureException extends RuntimeException { + public TaskFailureException(Exception e) { + super(e); + } +} diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java new file mode 100644 index 0000000..ef0b5bb --- /dev/null +++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java @@ -0,0 +1,55 @@ +package io.trygvis.queue; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +public class JdbcQueueService { + + private final QueueSystem queueSystem; + + private final SqlEffectExecutor sqlEffectExecutor; + + private final Map<String, QueueExecutor> queues = new HashMap<>(); + + JdbcQueueService(QueueSystem queueSystem) { + this.queueSystem = queueSystem; + this.sqlEffectExecutor = queueSystem.sqlEffectExecutor; + } + + public synchronized QueueExecutor getQueue(String name) { + QueueExecutor queueExecutor = queues.get(name); + + if (queueExecutor != null) { + return queueExecutor; + } + + throw new IllegalArgumentException("No such queue: " + name); + } + + public synchronized QueueExecutor lookupQueue(Connection c, String name, long interval, boolean autoCreate) throws SQLException { + QueueExecutor queueExecutor = queues.get(name); + + if (queueExecutor != null) { + return queueExecutor; + } + + QueueDao queueDao = queueSystem.createQueueDao(c); + + Queue q = queueDao.findByName(name); + + if (q == null) { + if (!autoCreate) { + throw new SQLException("No such queue: '" + name + "'."); + } + + q = new Queue(name, interval); + queueDao.insert(q); + } + + queueExecutor = new QueueExecutor(queueSystem, sqlEffectExecutor, q); + queues.put(name, queueExecutor); + return queueExecutor; + } +} diff --git a/src/main/java/io/trygvis/queue/Queue.java b/src/main/java/io/trygvis/queue/Queue.java new file mode 100755 index 0000000..15003f7 --- /dev/null +++ b/src/main/java/io/trygvis/queue/Queue.java @@ -0,0 +1,24 @@ +package io.trygvis.queue; + +public class Queue { + + public final String name; + + public final long interval; + + public Queue(String name, long interval) { + this.name = name; + this.interval = interval; + } + + public Queue withInterval(int interval) { + return new Queue(name, interval); + } + + public String toString() { + return "Queue{" + + "name='" + name + '\'' + + ", interval=" + interval + + '}'; + } +} diff --git a/src/main/java/io/trygvis/queue/QueueDao.java b/src/main/java/io/trygvis/queue/QueueDao.java new file mode 100644 index 0000000..2f69e11 --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueDao.java @@ -0,0 +1,45 @@ +package io.trygvis.queue; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +public class QueueDao { + + private final Connection connection; + + public QueueDao(Connection connection) { + this.connection = connection; + } + + public Queue findByName(String name) throws SQLException { + try (PreparedStatement stmt = connection.prepareStatement("SELECT name, interval FROM queue WHERE name=?")) { + stmt.setString(1, name); + ResultSet rs = stmt.executeQuery(); + return rs.next() ? mapRow(rs) : null; + } + } + + public void insert(Queue q) throws SQLException { + try (PreparedStatement stmt = connection.prepareStatement("INSERT INTO queue(name, interval) VALUES(?, ?)")) { + int i = 1; + stmt.setString(i++, q.name); + stmt.setLong(i, q.interval); + stmt.executeUpdate(); + } + } + + public void update(Queue q) throws SQLException { + try (PreparedStatement stmt = connection.prepareStatement("UPDATE queue SET interval=? WHERE name=?")) { + int i = 1; + stmt.setLong(i++, q.interval); + stmt.setString(i, q.name); + stmt.executeUpdate(); + } + } + + public Queue mapRow(ResultSet rs) throws SQLException { + return new Queue(rs.getString(1), rs.getLong(2)); + } +} diff --git a/src/main/java/io/trygvis/queue/QueueExecutor.java b/src/main/java/io/trygvis/queue/QueueExecutor.java new file mode 100644 index 0000000..88e5b46 --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueExecutor.java @@ -0,0 +1,177 @@ +package io.trygvis.queue; + +import io.trygvis.async.QueueStats; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Date; +import java.util.List; + +import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.*; +import static io.trygvis.queue.Task.TaskState.NEW; + +public class QueueExecutor { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final QueueSystem queueSystem; + + private final SqlEffectExecutor sqlEffectExecutor; + + public final Queue queue; + + private final Stats stats = new Stats(); + + public enum TaskExecutionResult { + OK, + FAILED, + MISSED + } + + public QueueExecutor(QueueSystem queueSystem, SqlEffectExecutor sqlEffectExecutor, Queue queue) { + this.queueSystem = queueSystem; + this.sqlEffectExecutor = sqlEffectExecutor; + this.queue = queue; + } + + private static class Stats { + public int total; + public int ok; + public int failed; + public int scheduled; + public int missed; + + public synchronized QueueStats toStats() { + return new QueueStats(total, ok, failed, missed, scheduled); + } + } + + public QueueStats getStats() { + return stats.toStats(); + } + + public void consumeAll(final QueueService.TaskExecutionRequest req, final TaskEffect effect) throws SQLException { + log.info("Consuming tasks: request={}", req); + + List<Task> tasks; + do { + tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() { + @Override + public List<Task> doInConnection(Connection c) throws SQLException { + return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, req.chunkSize); + } + }); + + log.info("Consuming chunk with {} tasks", tasks.size()); + + applyTasks(req, effect, tasks); + } while (tasks.size() > 0); + } + + public void applyTasks(QueueService.TaskExecutionRequest req, TaskEffect effect, List<Task> tasks) { + for (Task task : tasks) { + TaskExecutionResult result = applyTask(effect, task); + + if (result == FAILED && req.stopOnError) { + throw new RuntimeException("Error while executing task, id=" + task.id()); + } + } + } + + /** + * Executed each task in its own transaction. + * <p/> + * If the task fails, the status is set to error in a separate transaction. + */ + public TaskExecutionResult applyTask(TaskEffect effect, final Task task) { + try { + Integer count = sqlEffectExecutor.transaction(new SqlEffect<Integer>() { + @Override + public Integer doInConnection(Connection c) throws SQLException { + return queueSystem.createTaskDao(c).update(task.markProcessing(), NEW); + } + }); + + if (count == 0) { + log.warn("Missed task {}", task.id()); + synchronized (stats) { + stats.missed++; + } + return MISSED; + } + + log.info("Executing task {}", task.id()); + + final List<Task> newTasks = effect.apply(task); + + final Date now = new Date(); + + log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size()); + + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + for (Task newTask : newTasks) { + schedule(c, newTask); + } + + queueSystem.createTaskDao(c).update(task.markOk(now)); + } + }); + + synchronized (stats) { + stats.total++; + stats.ok++; + } + + return OK; + } catch (Exception e) { + final Date now = new Date(); + log.error("Unable to execute task, id=" + task.id(), e); + + synchronized (stats) { + stats.total++; + stats.failed++; + } + + try { + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + TaskDao taskDao = queueSystem.createTaskDao(c); + taskDao.update(task.markFailed(now)); + } + }); + } catch (SQLException e1) { + log.error("Error while marking task as failed.", e1); + } + + return FAILED; + } + } + + public void schedule(Connection c, Task task) throws SQLException { + schedule(c, task.queue, task.parent, task.scheduled, task.arguments); + } + + public Task schedule(Connection c, Date scheduled, List<String> arguments) throws SQLException { + return schedule(c, queue.name, null, scheduled, arguments); + } + + public Task schedule(Connection c, long parent, Date scheduled, List<String> arguments) throws SQLException { + return schedule(c, queue.name, parent, scheduled, arguments); + } + + private Task schedule(Connection c, String queue, Long parent, Date scheduled, List<String> arguments) throws SQLException { + TaskDao taskDao = queueSystem.createTaskDao(c); + + long id = taskDao.insert(parent, queue, NEW, scheduled, arguments); + + synchronized (stats) { + stats.scheduled++; + } + + return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments); + } +} diff --git a/src/main/java/io/trygvis/queue/QueueService.java b/src/main/java/io/trygvis/queue/QueueService.java new file mode 100644 index 0000000..1c38f1f --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueService.java @@ -0,0 +1,54 @@ +package io.trygvis.queue; + +import java.sql.SQLException; +import java.util.Date; +import java.util.List; + +public interface QueueService { + QueueExecutor getQueue(String name, int interval, boolean autoCreate) throws SQLException; + + void schedule(Queue queue, Date scheduled, List<String> arguments) throws SQLException; + + public static class TaskExecutionRequest { + public final long chunkSize; + public final boolean stopOnError; + public final Long interval; + public final boolean continueOnFullChunk; + // TODO: saveExceptions + + public TaskExecutionRequest(long chunkSize, boolean stopOnError) { + this(chunkSize, stopOnError, null, true); + } + + private TaskExecutionRequest(long chunkSize, boolean stopOnError, Long interval, boolean continueOnFullChunk) { + this.chunkSize = chunkSize; + this.stopOnError = stopOnError; + this.interval = interval; + this.continueOnFullChunk = continueOnFullChunk; + } + + public TaskExecutionRequest interval(long interval) { + return new TaskExecutionRequest(chunkSize, stopOnError, interval, continueOnFullChunk); + } + + public TaskExecutionRequest continueOnFullChunk(boolean continueOnFullChunk) { + return new TaskExecutionRequest(chunkSize, stopOnError, interval, continueOnFullChunk); + } + + @Override + public String toString() { + return "TaskExecutionRequest{" + + "chunkSize=" + chunkSize + + ", stopOnError=" + stopOnError + + '}'; + } + + public long interval(Queue queue) { + if (interval != null) { + return interval; + } + + return queue.interval; + } + } +} diff --git a/src/main/java/io/trygvis/queue/QueueStats.java b/src/main/java/io/trygvis/queue/QueueStats.java new file mode 100644 index 0000000..5b048b3 --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueStats.java @@ -0,0 +1,20 @@ +package io.trygvis.queue; + +import java.util.EnumMap; + +public class QueueStats { + public final String name; + public final long totalTaskCount; + public final EnumMap<Task.TaskState, Long> states; + + public QueueStats(String name, EnumMap<Task.TaskState, Long> states) { + this.name = name; + this.states = states; + + long c = 0; + for (Long l : states.values()) { + c += l; + } + this.totalTaskCount = c; + } +} diff --git a/src/main/java/io/trygvis/queue/QueueSystem.java b/src/main/java/io/trygvis/queue/QueueSystem.java new file mode 100644 index 0000000..5954526 --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueSystem.java @@ -0,0 +1,61 @@ +package io.trygvis.queue; + +import io.trygvis.async.JdbcAsyncService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.SQLException; + +public class QueueSystem { + private final Logger log = LoggerFactory.getLogger(getClass()); + + public final SqlEffectExecutor sqlEffectExecutor; + + private final JdbcQueueService queueService; + + private QueueSystem(SqlEffectExecutor sqlEffectExecutor) throws SQLException { + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + if (c.getAutoCommit()) { + throw new SQLException("The connection cannot be in auto-commit mode."); + } + + DatabaseMetaData metaData = c.getMetaData(); + String productName = metaData.getDatabaseProductName(); + String productVersion = metaData.getDatabaseProductVersion(); + + log.info("productName = " + productName); + log.info("productVersion = " + productVersion); + } + }); + + this.sqlEffectExecutor = sqlEffectExecutor; + queueService = new JdbcQueueService(this); + } + + /** + * Initializes the queue system. Use this as the first thing do as it will validate the database. + */ + public static QueueSystem initialize(SqlEffectExecutor sqlEffectExecutor) throws SQLException { + return new QueueSystem(sqlEffectExecutor); + } + + public JdbcQueueService createQueueService() { + return queueService; + } + + public QueueDao createQueueDao(Connection c) { + return new QueueDao(c); + } + + public TaskDao createTaskDao(Connection c) { + return new TaskDao(c); + } + + public JdbcAsyncService createAsyncService() { + return new JdbcAsyncService(this); + } +} diff --git a/src/main/java/io/trygvis/queue/SqlEffect.java b/src/main/java/io/trygvis/queue/SqlEffect.java new file mode 100644 index 0000000..7864bcd --- /dev/null +++ b/src/main/java/io/trygvis/queue/SqlEffect.java @@ -0,0 +1,12 @@ +package io.trygvis.queue; + +import java.sql.Connection; +import java.sql.SQLException; + +public interface SqlEffect<A> { + A doInConnection(Connection c) throws SQLException; + + interface Void { + void doInConnection(Connection c) throws SQLException; + } +} diff --git a/src/main/java/io/trygvis/queue/SqlEffectExecutor.java b/src/main/java/io/trygvis/queue/SqlEffectExecutor.java new file mode 100644 index 0000000..92802da --- /dev/null +++ b/src/main/java/io/trygvis/queue/SqlEffectExecutor.java @@ -0,0 +1,50 @@ +package io.trygvis.queue; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; + +public class SqlEffectExecutor { + + private final DataSource dataSource; + + public SqlEffectExecutor(DataSource dataSource) { + this.dataSource = dataSource; + } + + public <A> A transaction(SqlEffect<A> effect) throws SQLException { +// int pid; + + try (Connection c = dataSource.getConnection()) { +// pid = getPid(c); +// System.out.println("pid = " + pid); + + boolean ok = false; + try { + A a = effect.doInConnection(c); + c.commit(); + ok = true; + return a; + } finally { +// System.out.println("Closing, pid = " + pid); + if (!ok) { + try { + c.rollback(); + } catch (SQLException e) { + // ignore + } + } + } + } + } + + public void transaction(final SqlEffect.Void effect) throws SQLException { + transaction(new SqlEffect<Object>() { + @Override + public Object doInConnection(Connection c) throws SQLException { + effect.doInConnection(c); + return null; + } + }); + } +} diff --git a/src/main/java/io/trygvis/queue/Task.java b/src/main/java/io/trygvis/queue/Task.java new file mode 100755 index 0000000..8038050 --- /dev/null +++ b/src/main/java/io/trygvis/queue/Task.java @@ -0,0 +1,117 @@ +package io.trygvis.queue; + +import java.util.Date; +import java.util.List; + +import static io.trygvis.queue.Task.TaskState.*; +import static java.util.Arrays.asList; + +public class Task { + + public enum TaskState { + NEW, + PROCESSING, + OK, + FAILED + } + + private final long id; + + public final Long parent; + + public final String queue; + + public final TaskState state; + + public final Date scheduled; + + public final Date lastRun; + + public final int runCount; + + public final Date completed; + + public final List<String> arguments; + + public Task(long id, Long parent, String queue, TaskState state, Date scheduled, Date lastRun, int runCount, Date completed, List<String> arguments) { + this.id = id; + this.parent = parent; + this.queue = queue; + this.state = state; + this.scheduled = scheduled; + this.lastRun = lastRun; + this.runCount = runCount; + this.completed = completed; + + this.arguments = arguments; + } + + public Task markProcessing() { + return new Task(id, parent, queue, PROCESSING, scheduled, new Date(), runCount + 1, completed, arguments); + } + + public Task markOk(Date completed) { + return new Task(id, parent, queue, OK, scheduled, lastRun, runCount, completed, arguments); + } + + public Task markFailed(Date now) { + return new Task(id, parent, queue, FAILED, scheduled, lastRun, runCount, completed, arguments); + } + + public String toString() { + return "Task{" + + "id=" + id + + ", parent=" + parent + + ", queue=" + queue + + ", state=" + state + + ", scheduled=" + scheduled + + ", lastRun=" + lastRun + + ", runCount=" + runCount + + ", completed=" + completed + + ", arguments='" + arguments + '\'' + + '}'; + } + + public long id() { + if (id == 0) { + throw new RuntimeException("This task has not been persisted yet."); + } + + return id; + } + + public boolean isDone() { + return completed != null; + } + + public Task childTask(String queue, Date scheduled, String... arguments) { + return new Task(0, id(), queue, NEW, scheduled, null, 0, null, asList(arguments)); + } + + public static Task newTask(String queue, Date scheduled, String... arguments) { + return new Task(0, null, queue, NEW, scheduled, null, 0, null, asList(arguments)); + } + + public static Task newTask(String queue, Date scheduled, List<String> arguments) { + return new Task(0, null, queue, NEW, scheduled, null, 0, null, arguments); + } + + public static List<String> stringToArguments(String arguments) { + return asList(arguments.split(",")); + } + + public static String argumentsToString(List<String> arguments) { + StringBuilder builder = new StringBuilder(); + for (int i = 0, argumentsLength = arguments.size(); i < argumentsLength; i++) { + String argument = arguments.get(i); + if (argument.contains(",")) { + throw new RuntimeException("The argument string can't contain a comma."); + } + if (i > 0) { + builder.append(','); + } + builder.append(argument); + } + return builder.toString(); + } +} diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java new file mode 100644 index 0000000..365b44b --- /dev/null +++ b/src/main/java/io/trygvis/queue/TaskDao.java @@ -0,0 +1,148 @@ +package io.trygvis.queue; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.EnumMap; +import java.util.List; + +import static io.trygvis.queue.Task.*; + +public class TaskDao { + + private final Connection c; + + public static final String fields = "id, parent, queue, state, scheduled, last_run, run_count, completed, arguments"; + + TaskDao(Connection c) { + this.c = c; + } + + public long insert(Long parent, String queue, TaskState state, Date scheduled, List<String> arguments) throws SQLException { + String sql = "INSERT INTO task(id, parent, run_count, queue, state, scheduled, arguments) " + + "VALUES(nextval('task_seq'), ?, 0, ?, ?, ?, ?)"; + try (PreparedStatement stmt = c.prepareStatement(sql)) { + int i = 1; + if (parent == null) { + stmt.setNull(i++, Types.BIGINT); + } else { + stmt.setLong(i++, parent); + } + stmt.setString(i++, queue); + stmt.setString(i++, state.name()); + stmt.setTimestamp(i++, new Timestamp(scheduled.getTime())); + stmt.setString(i, argumentsToString(arguments)); + stmt.executeUpdate(); + } + try (PreparedStatement stmt = c.prepareStatement("SELECT currval('task_seq')")) { + ResultSet rs = stmt.executeQuery(); + rs.next(); + return rs.getLong(1); + } + } + + public Task findById(long id) throws SQLException { + try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE id=?")) { + stmt.setLong(1, id); + ResultSet rs = stmt.executeQuery(); + return rs.next() ? mapRow(rs) : null; + } + } + + public List<Task> findByQueueAndState(String queue, TaskState state, long limit) throws SQLException { + try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND state=? LIMIT ?")) { + int i = 1; + stmt.setString(i++, queue); + stmt.setString(i++, state.name()); + stmt.setLong(i, limit); + ResultSet rs = stmt.executeQuery(); + List<Task> list = new ArrayList<>(); + while (rs.next()) { + list.add(mapRow(rs)); + } + return list; + } + } + + public QueueStats findQueueStatsByName(String queue) throws SQLException { + try (PreparedStatement stmt = c.prepareStatement("SELECT state, COUNT(id) FROM task WHERE queue=? GROUP BY state")) { + int i = 1; + stmt.setString(i, queue); + ResultSet rs = stmt.executeQuery(); + EnumMap<TaskState, Long> states = new EnumMap<>(TaskState.class); + while (rs.next()) { + states.put(TaskState.valueOf(rs.getString(1)), rs.getLong(2)); + } + return new QueueStats(queue, states); + } + } + + public int update(Task task) throws SQLException { + return update(task, null); + } + + public int update(Task task, TaskState state) throws SQLException { + String sql = "UPDATE task SET state=?, scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?"; + + if (state != null) { + sql += " AND state=?"; + } + + try (PreparedStatement stmt = c.prepareStatement(sql)) { + int i = 1; + stmt.setString(i++, task.state.name()); + stmt.setTimestamp(i++, new Timestamp(task.scheduled.getTime())); + setTimestamp(stmt, i++, task.lastRun); + stmt.setInt(i++, task.runCount); + setTimestamp(stmt, i++, task.completed); + stmt.setLong(i++, task.id()); + if (state != null) { + stmt.setString(i, state.name()); + } + return stmt.executeUpdate(); + } + } + + public void setState(Task task, TaskState state) throws SQLException { + try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=? WHERE id = ?")) { + int i = 1; + stmt.setString(i++, state.name()); + stmt.setLong(i, task.id()); + stmt.executeUpdate(); + } + } + + private static void setTimestamp(PreparedStatement stmt, int parameterIndex, Date date) throws SQLException { + if (date == null) { + stmt.setNull(parameterIndex, Types.TIMESTAMP); + } else { + stmt.setTimestamp(parameterIndex, new Timestamp(date.getTime())); + } + } + + public Task mapRow(ResultSet rs) throws SQLException { + String arguments = rs.getString(9); + int i = 1; + return new Task( + rs.getLong(i++), + rs.getLong(i++), + rs.getString(i++), + TaskState.valueOf(rs.getString(i++)), + rs.getTimestamp(i++), + rs.getTimestamp(i++), + rs.getInt(i++), + rs.getTimestamp(i), + arguments != null ? stringToArguments(arguments) : Collections.<String>emptyList()); + } + + public void rollback() throws SQLException { + c.rollback(); + c.close(); + } +} diff --git a/src/main/java/io/trygvis/queue/TaskEffect.java b/src/main/java/io/trygvis/queue/TaskEffect.java new file mode 100644 index 0000000..186797f --- /dev/null +++ b/src/main/java/io/trygvis/queue/TaskEffect.java @@ -0,0 +1,7 @@ +package io.trygvis.queue; + +import java.util.List; + +public interface TaskEffect { + List<Task> apply(Task task) throws Exception; +} diff --git a/src/main/java/io/trygvis/spring/DefaultConfig.java b/src/main/java/io/trygvis/spring/DefaultConfig.java new file mode 100644 index 0000000..6890d58 --- /dev/null +++ b/src/main/java/io/trygvis/spring/DefaultConfig.java @@ -0,0 +1,31 @@ +package io.trygvis.spring; + +import io.trygvis.async.AsyncService; +import io.trygvis.queue.SqlEffectExecutor; +import io.trygvis.queue.QueueService; +import io.trygvis.queue.QueueSystem; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.JdbcTemplate; + +import javax.sql.DataSource; +import java.sql.SQLException; + +@Configuration +public class DefaultConfig { + + @Bean + public QueueSystem queueSystem(DataSource ds) throws SQLException { + return QueueSystem.initialize(new SqlEffectExecutor(ds)); + } + + @Bean + public AsyncService asyncService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) { + return new SpringJdbcAsyncService(queueSystem, jdbcTemplate); + } + + @Bean + public QueueService queueService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) { + return new SpringQueueService(queueSystem, jdbcTemplate); + } +} diff --git a/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java b/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java new file mode 100644 index 0000000..12dc71e --- /dev/null +++ b/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java @@ -0,0 +1,110 @@ +package io.trygvis.spring; + +import io.trygvis.async.AsyncService; +import io.trygvis.async.JdbcAsyncService; +import io.trygvis.async.QueueController; +import io.trygvis.queue.SqlEffect; +import io.trygvis.queue.JdbcQueueService; +import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueService; +import io.trygvis.queue.QueueSystem; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jdbc.core.ConnectionCallback; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionSynchronization; +import org.springframework.transaction.support.TransactionSynchronizationAdapter; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Date; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static org.springframework.transaction.annotation.Propagation.REQUIRED; +import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization; + +public class SpringJdbcAsyncService implements AsyncService { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final ScheduledThreadPoolExecutor executor; + + private final JdbcTemplate jdbcTemplate; + + private final JdbcAsyncService jdbcAsyncService; + + private final JdbcQueueService queueService; + + private final QueueSystem queueSystem; + + public SpringJdbcAsyncService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) { + this.queueSystem = queueSystem; + this.jdbcTemplate = jdbcTemplate; + jdbcAsyncService = new JdbcAsyncService(queueSystem); + queueService = queueSystem.createQueueService(); + executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); + } + + @Transactional(propagation = REQUIRED) + public QueueController registerQueue(final Queue queue, final QueueService.TaskExecutionRequest req, final TaskEffect processor) throws SQLException { + QueueExecutor queueExecutor = queueSystem.sqlEffectExecutor.transaction(new SqlEffect<QueueExecutor>() { + @Override + public QueueExecutor doInConnection(Connection c) throws SQLException { + return queueService.lookupQueue(c, queue.name, queue.interval, true); + } + }); + + final QueueController queueController = jdbcAsyncService.registerQueue(queueExecutor, req, processor); + + registerSynchronization(new TransactionSynchronizationAdapter() { + public void afterCompletion(int status) { + log.info("Transaction completed with status = {}", status); + if (status == TransactionSynchronization.STATUS_COMMITTED) { + queueController.start(executor); + } + } + }); + + return queueController; + } + + public QueueExecutor getQueue(String name) { + return jdbcAsyncService.getQueue(name); + } + + @Transactional(propagation = REQUIRED) + public Task schedule(final Queue queue, final Date scheduled, final List<String> args) { + return jdbcTemplate.execute(new ConnectionCallback<Task>() { + @Override + public Task doInConnection(Connection c) throws SQLException { + QueueExecutor queueExecutor = queueService.getQueue(queue.name); + return queueExecutor.schedule(c, scheduled, args); + } + }); + } + + public Task schedule(final Queue queue, final long parent, final Date scheduled, final List<String> args) { + return jdbcTemplate.execute(new ConnectionCallback<Task>() { + @Override + public Task doInConnection(Connection c) throws SQLException { + QueueExecutor queueExecutor = queueService.getQueue(queue.name); + return queueExecutor.schedule(c, parent, scheduled, args); + } + }); + } + + @Transactional(readOnly = true) + public Task update(final Task ref) { + return jdbcTemplate.execute(new ConnectionCallback<Task>() { + @Override + public Task doInConnection(Connection c) throws SQLException { + return jdbcAsyncService.update(c, ref); + } + }); + } +} diff --git a/src/main/java/io/trygvis/spring/SpringQueueService.java b/src/main/java/io/trygvis/spring/SpringQueueService.java new file mode 100644 index 0000000..2027ab5 --- /dev/null +++ b/src/main/java/io/trygvis/spring/SpringQueueService.java @@ -0,0 +1,49 @@ +package io.trygvis.spring; + +import io.trygvis.queue.JdbcQueueService; +import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueService; +import io.trygvis.queue.QueueSystem; +import org.springframework.dao.DataAccessException; +import org.springframework.jdbc.core.ConnectionCallback; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.transaction.annotation.Transactional; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Date; +import java.util.List; + +public class SpringQueueService implements QueueService { + + public final JdbcTemplate jdbcTemplate; + + public JdbcQueueService queueService; + + public SpringQueueService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) { + this.jdbcTemplate = jdbcTemplate; + this.queueService = queueSystem.createQueueService(); + } + + @Transactional + public QueueExecutor getQueue(final String name, final int interval, final boolean autoCreate) throws SQLException { + return jdbcTemplate.execute(new ConnectionCallback<QueueExecutor>() { + @Override + public QueueExecutor doInConnection(Connection c) throws SQLException, DataAccessException { + return queueService.lookupQueue(c, name, interval, autoCreate); + } + }); + } + + @Transactional + public void schedule(final Queue queue, final Date scheduled, final List<String> arguments) throws SQLException { + jdbcTemplate.execute(new ConnectionCallback<Object>() { + @Override + public Object doInConnection(Connection c) throws SQLException, DataAccessException { + queueService.getQueue(queue.name).schedule(c, scheduled, arguments); + return null; + } + }); + } +} |