diff options
| author | Trygve Laugstøl <trygvis@inamo.no> | 2013-06-23 09:37:57 +0200 | 
|---|---|---|
| committer | Trygve Laugstøl <trygvis@inamo.no> | 2013-06-23 09:37:57 +0200 | 
| commit | 7caa5b1f1e08f99cfe4465f091f47e2966d78aa7 (patch) | |
| tree | c0bd7202845697207b04d518f613588df17d9e12 /src/main/java/io | |
| download | jdbc-queue-master.tar.gz jdbc-queue-master.tar.bz2 jdbc-queue-master.tar.xz jdbc-queue-master.zip | |
Diffstat (limited to 'src/main/java/io')
21 files changed, 1453 insertions, 0 deletions
| diff --git a/src/main/java/io/trygvis/activemq/ActiveMqHinter.java b/src/main/java/io/trygvis/activemq/ActiveMqHinter.java new file mode 100644 index 0000000..f2cfb6e --- /dev/null +++ b/src/main/java/io/trygvis/activemq/ActiveMqHinter.java @@ -0,0 +1,152 @@ +package io.trygvis.activemq; + +import io.trygvis.async.QueueController; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Queue; +import javax.jms.QueueConnection; +import javax.jms.QueueSession; +import javax.jms.TextMessage; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.io.StringReader; +import java.nio.charset.Charset; +import java.sql.SQLException; + +import static java.lang.Long.parseLong; +import static java.lang.System.arraycopy; +import static java.nio.charset.Charset.forName; +import static javax.jms.Session.AUTO_ACKNOWLEDGE; + +public class ActiveMqHinter implements AutoCloseable { +    private final Logger log = LoggerFactory.getLogger(getClass()); + +    private final QueueConnection c; + +    private static final Charset utf8 = forName("utf-8"); + +    public ActiveMqHinter(ActiveMQConnectionFactory connectionFactory) throws JMSException { +        log.info("Connecting to ActiveMQ Broker at {}", connectionFactory.getBrokerURL()); +        c = connectionFactory.createQueueConnection(); +        c.start(); +        log.info("Connected, clientId = {}", c.getClientID()); +    } + +    public void createHinter(final QueueController controller) throws JMSException { +        QueueSession session = c.createQueueSession(false, AUTO_ACKNOWLEDGE); +        Queue queue = session.createQueue(controller.queue.queue.name); +        final MessageConsumer consumer = session.createConsumer(queue); + +        consumer.setMessageListener(new MessageListener() { +            @Override +            public void onMessage(Message message) { +                if ((message instanceof TextMessage)) { +                    String body; +                    try { +                        TextMessage textMessage = (TextMessage) message; +                        body = textMessage.getText(); +                    } catch (JMSException e) { +                        log.warn("Exception while reading body.", e); +                        throw new RuntimeException("Exception while reading body.", e); +                    } + +                    consumeString(new StringReader(body), controller); +                } else if (message instanceof BytesMessage) { +                    final BytesMessage bytesMessage = (BytesMessage) message; +                    consumeString(new InputStreamReader(new ByteMessageInputStream(bytesMessage), utf8), controller); +                } else { +                    throw new RuntimeException("Unsupported message type: " + message.getClass()); +                } +            } +        }); + +        controller.addOnStopListener(new Runnable() { +            @Override +            public void run() { +                try { +                    consumer.close(); +                } catch (JMSException e) { +                    log.error("Error while closing JMS consumer", e); +                } +            } +        }); +    } + +    private void consumeString(Reader reader, QueueController controller) { +        try { +            BufferedReader r = new BufferedReader(reader); + +            String line = r.readLine(); + +            while (line != null) { +                for (String id : line.split(",")) { +                    controller.hint(parseLong(id.trim())); +                } +                line = r.readLine(); +            } +        } catch (IOException | SQLException e) { +            log.warn("Could not consume body.", e); +            throw new RuntimeException("Could not consume body.", e); +        } catch (NumberFormatException e) { +            log.warn("Could not consume body.", e); +            throw e; +        } +    } + +    public void close() throws JMSException { +        c.close(); +    } + +    private static class ByteMessageInputStream extends InputStream { +        private final BytesMessage bytesMessage; + +        public ByteMessageInputStream(BytesMessage bytesMessage) { +            this.bytesMessage = bytesMessage; +        } + +        @Override +        public int read(byte[] b) throws IOException { +            try { +                return bytesMessage.readBytes(b); +            } catch (JMSException e) { +                throw new IOException(e); +            } +        } + +        @Override +        public int read(byte[] out, int off, int len) throws IOException { +            byte[] b = new byte[len]; +            try { +                int read = bytesMessage.readBytes(b); +                if (read == -1) { +                    return -1; +                } +                arraycopy(b, 0, out, off, read); +                return read; +            } catch (JMSException e) { +                throw new IOException(e); +            } +        } + +        @Override +        public int read() throws IOException { +            try { +                return bytesMessage.readByte(); +            } catch (javax.jms.MessageEOFException e) { +                return -1; +            } catch (JMSException e) { +                throw new IOException(e); +            } +        } +    } +} diff --git a/src/main/java/io/trygvis/async/AsyncService.java b/src/main/java/io/trygvis/async/AsyncService.java new file mode 100755 index 0000000..9332596 --- /dev/null +++ b/src/main/java/io/trygvis/async/AsyncService.java @@ -0,0 +1,30 @@ +package io.trygvis.async; + +import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueService; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; + +import java.sql.SQLException; +import java.util.Date; +import java.util.List; + +/** + * A simple framework for running tasks. + */ +public interface AsyncService { + +    QueueController registerQueue(Queue queue, QueueService.TaskExecutionRequest req, TaskEffect processor) throws SQLException; + +    QueueExecutor getQueue(String name); + +    Task schedule(Queue queue, Date scheduled, List<String> args); + +    Task schedule(Queue queue, long parent, Date scheduled, List<String> args); + +    /** +     * Polls for a new state of the execution. +     */ +    Task update(Task ref); +} diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java new file mode 100644 index 0000000..46f1f30 --- /dev/null +++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java @@ -0,0 +1,75 @@ +package io.trygvis.async; + +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueService; +import io.trygvis.queue.QueueSystem; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +import static java.lang.System.currentTimeMillis; +import static java.lang.Thread.sleep; + +public class JdbcAsyncService { +    private final Map<String, QueueController> queues = new HashMap<>(); + +    private final QueueSystem queueSystem; + +    public JdbcAsyncService(QueueSystem queueSystem) { +        this.queueSystem = queueSystem; +    } + +    public synchronized QueueController registerQueue(QueueExecutor queue, QueueService.TaskExecutionRequest req, TaskEffect processor) { +        if (queues.containsKey(queue.queue.name)) { +            throw new IllegalArgumentException("Queue already exist."); +        } + +        QueueController queueController = new QueueController(queueSystem, req, processor, queue); + +        queues.put(queue.queue.name, queueController); + +        return queueController; +    } + +    public QueueExecutor getQueue(String name) { +        return getQueueThread(name).queue; +    } + +    public Task await(Connection c, Task task, long timeout) throws SQLException { +        final long start = currentTimeMillis(); +        final long end = start + timeout; + +        while (currentTimeMillis() < end) { +            task = update(c, task); + +            if (task == null) { +                throw new RuntimeException("The task went away."); +            } + +            try { +                sleep(100); +            } catch (InterruptedException e) { +                // break +            } +        } + +        return task; +    } + +    public Task update(Connection c, Task ref) throws SQLException { +        return queueSystem.createTaskDao(c).findById(ref.id()); +    } + +    private synchronized QueueController getQueueThread(String name) { +        QueueController queueController = queues.get(name); + +        if (queueController == null) { +            throw new RuntimeException("No such queue: '" + name + "'."); +        } +        return queueController; +    } +} diff --git a/src/main/java/io/trygvis/async/QueueController.java b/src/main/java/io/trygvis/async/QueueController.java new file mode 100644 index 0000000..a343d42 --- /dev/null +++ b/src/main/java/io/trygvis/async/QueueController.java @@ -0,0 +1,201 @@ +package io.trygvis.async; + +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueSystem; +import io.trygvis.queue.SqlEffect; +import io.trygvis.queue.SqlEffectExecutor; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static io.trygvis.queue.QueueExecutor.TaskExecutionResult; +import static io.trygvis.queue.QueueService.TaskExecutionRequest; +import static io.trygvis.queue.Task.TaskState.NEW; + +public class QueueController { +    private final Logger log = LoggerFactory.getLogger(getClass()); + +    private final QueueSystem queueSystem; + +    private final SqlEffectExecutor sqlEffectExecutor; + +    private final TaskEffect taskEffect; + +    private final TaskExecutionRequest req; + +    public final QueueExecutor queue; + +    private boolean shouldRun = true; + +    private boolean checkForNewTasks; + +    private boolean running; + +    private Thread thread; + +    private ScheduledThreadPoolExecutor executor; + +    private List<Runnable> stopListeners = new ArrayList<>(); + +    public QueueController(QueueSystem queueSystem, TaskExecutionRequest req, TaskEffect taskEffect, QueueExecutor queue) { +        this.queueSystem = queueSystem; +        this.req = req; +        this.sqlEffectExecutor = queueSystem.sqlEffectExecutor; +        this.taskEffect = taskEffect; +        this.queue = queue; +    } + +    public void scheduleAll(final List<Task> tasks) throws InterruptedException { +        ExecutorCompletionService<TaskExecutionResult> service = new ExecutorCompletionService<>(executor); + +        for (final Task task : tasks) { +            service.submit(new Callable<TaskExecutionResult>() { +                @Override +                public TaskExecutionResult call() throws Exception { +                    return queue.applyTask(taskEffect, task); +                } +            }); +        } +    } + +    public void invokeAll(final List<Task> tasks) throws InterruptedException { +        List<Callable<TaskExecutionResult>> callables = new ArrayList<>(tasks.size()); +        for (final Task task : tasks) { +            callables.add(new Callable<TaskExecutionResult>() { +                @Override +                public TaskExecutionResult call() throws Exception { +                    return queue.applyTask(taskEffect, task); +                } +            }); +        } + +        executor.invokeAll(callables); +    } + +    public void hint(final long id) throws SQLException { +        sqlEffectExecutor.transaction(new SqlEffect.Void() { +            @Override +            public void doInConnection(Connection c) throws SQLException { +                Task task = queueSystem.createTaskDao(c).findById(id); + +                try { +                    scheduleAll(Collections.singletonList(task)); +                } catch (InterruptedException e) { +                    throw new SQLException(e); +                } +            } +        }); +    } + +    public synchronized void addOnStopListener(Runnable runnable) { +        stopListeners.add(runnable); +    } + +    private class QueueThread implements Runnable { +        public void run() { +            while (shouldRun) { +                List<Task> tasks = null; + +                try { +                    tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() { +                        public List<Task> doInConnection(Connection c) throws SQLException { +                            return queueSystem.createTaskDao(c).findByQueueAndState(queue.queue.name, NEW, req.chunkSize); +                        } +                    }); + +                    log.info("Found {} tasks on queue {}", tasks.size(), queue.queue.name); + +                    if (tasks.size() > 0) { +                        invokeAll(tasks); +                    } +                } catch (Throwable e) { +                    if (shouldRun) { +                        log.warn("Error while executing tasks.", e); +                    } +                } + +                // If we found exactly the same number of tasks that we asked for, there is most likely more to go. +                if (req.continueOnFullChunk && tasks != null && tasks.size() == req.chunkSize) { +                    log.info("Got a full chunk, continuing directly."); +                    continue; +                } + +                synchronized (this) { +                    if (checkForNewTasks) { +                        log.info("Ping received!"); +                        checkForNewTasks = false; +                    } else { +                        try { +                            wait(req.interval(queue.queue)); +                        } catch (InterruptedException e) { +                            // ignore +                        } +                    } +                } +            } + +            log.info("Thread for queue {} has stopped.", queue.queue.name); +            running = false; +            synchronized (this) { +                this.notifyAll(); +            } +        } +    } + +    public synchronized void start(ScheduledThreadPoolExecutor executor) { +        if (running) { +            throw new IllegalStateException("Already running"); +        } + +        log.info("Starting thread for queue {} with poll interval = {}ms", queue.queue.name, queue.queue.interval); + +        running = true; +        this.executor = executor; + +        thread = new Thread(new QueueThread(), "queue: " + queue.queue.name); +        thread.setDaemon(true); +        thread.start(); +    } + +    public synchronized void stop() { +        if (!running) { +            return; +        } + +        log.info("Stopping thread for queue {}", queue.queue.name); + +        for (Runnable runnable : stopListeners) { +            try { +                runnable.run(); +            } catch (Throwable e) { +                log.error("Error while running stop listener " + runnable, e); +            } +        } + +        shouldRun = false; + +        // TODO: empty out the currently executing tasks. + +        thread.interrupt(); +        while (running) { +            try { +                wait(1000); +            } catch (InterruptedException e) { +                // continue +            } +            thread.interrupt(); +        } +        thread = null; +        executor.shutdownNow(); +    } +} diff --git a/src/main/java/io/trygvis/async/QueueStats.java b/src/main/java/io/trygvis/async/QueueStats.java new file mode 100644 index 0000000..8edc720 --- /dev/null +++ b/src/main/java/io/trygvis/async/QueueStats.java @@ -0,0 +1,28 @@ +package io.trygvis.async; + +public class QueueStats { +    public final int totalMessageCount; +    public final int okMessageCount; +    public final int failedMessageCount; +    public final int missedMessageCount; +    public final int scheduledMessageCount; + +    public QueueStats(int totalMessageCount, int okMessageCount, int failedMessageCount, int missedMessageCount, int scheduledMessageCount) { +        this.totalMessageCount = totalMessageCount; +        this.okMessageCount = okMessageCount; +        this.failedMessageCount = failedMessageCount; +        this.missedMessageCount = missedMessageCount; +        this.scheduledMessageCount = scheduledMessageCount; +    } + +    @Override +    public String toString() { +        return "QueueStats{" + +                "totalMessageCount=" + totalMessageCount + +                ", okMessageCount=" + okMessageCount + +                ", failedMessageCount=" + failedMessageCount + +                ", missedMessageCount=" + missedMessageCount + +                ", scheduledMessageCount=" + scheduledMessageCount + +                '}'; +    } +} diff --git a/src/main/java/io/trygvis/async/TaskFailureException.java b/src/main/java/io/trygvis/async/TaskFailureException.java new file mode 100644 index 0000000..7278e17 --- /dev/null +++ b/src/main/java/io/trygvis/async/TaskFailureException.java @@ -0,0 +1,7 @@ +package io.trygvis.async; + +class TaskFailureException extends RuntimeException { +    public TaskFailureException(Exception e) { +        super(e); +    } +} diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java new file mode 100644 index 0000000..ef0b5bb --- /dev/null +++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java @@ -0,0 +1,55 @@ +package io.trygvis.queue; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +public class JdbcQueueService { + +    private final QueueSystem queueSystem; + +    private final SqlEffectExecutor sqlEffectExecutor; + +    private final Map<String, QueueExecutor> queues = new HashMap<>(); + +    JdbcQueueService(QueueSystem queueSystem) { +        this.queueSystem = queueSystem; +        this.sqlEffectExecutor = queueSystem.sqlEffectExecutor; +    } + +    public synchronized QueueExecutor getQueue(String name) { +        QueueExecutor queueExecutor = queues.get(name); + +        if (queueExecutor != null) { +            return queueExecutor; +        } + +        throw new IllegalArgumentException("No such queue: " + name); +    } + +    public synchronized QueueExecutor lookupQueue(Connection c, String name, long interval, boolean autoCreate) throws SQLException { +        QueueExecutor queueExecutor = queues.get(name); + +        if (queueExecutor != null) { +            return queueExecutor; +        } + +        QueueDao queueDao = queueSystem.createQueueDao(c); + +        Queue q = queueDao.findByName(name); + +        if (q == null) { +            if (!autoCreate) { +                throw new SQLException("No such queue: '" + name + "'."); +            } + +            q = new Queue(name, interval); +            queueDao.insert(q); +        } + +        queueExecutor = new QueueExecutor(queueSystem, sqlEffectExecutor, q); +        queues.put(name, queueExecutor); +        return queueExecutor; +    } +} diff --git a/src/main/java/io/trygvis/queue/Queue.java b/src/main/java/io/trygvis/queue/Queue.java new file mode 100755 index 0000000..15003f7 --- /dev/null +++ b/src/main/java/io/trygvis/queue/Queue.java @@ -0,0 +1,24 @@ +package io.trygvis.queue; + +public class Queue { + +    public final String name; + +    public final long interval; + +    public Queue(String name, long interval) { +        this.name = name; +        this.interval = interval; +    } + +    public Queue withInterval(int interval) { +        return new Queue(name, interval); +    } + +    public String toString() { +        return "Queue{" + +                "name='" + name + '\'' + +                ", interval=" + interval + +                '}'; +    } +} diff --git a/src/main/java/io/trygvis/queue/QueueDao.java b/src/main/java/io/trygvis/queue/QueueDao.java new file mode 100644 index 0000000..2f69e11 --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueDao.java @@ -0,0 +1,45 @@ +package io.trygvis.queue; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +public class QueueDao { + +    private final Connection connection; + +    public QueueDao(Connection connection) { +        this.connection = connection; +    } + +    public Queue findByName(String name) throws SQLException { +        try (PreparedStatement stmt = connection.prepareStatement("SELECT name, interval FROM queue WHERE name=?")) { +            stmt.setString(1, name); +            ResultSet rs = stmt.executeQuery(); +            return rs.next() ? mapRow(rs) : null; +        } +    } + +    public void insert(Queue q) throws SQLException { +        try (PreparedStatement stmt = connection.prepareStatement("INSERT INTO queue(name, interval) VALUES(?, ?)")) { +            int i = 1; +            stmt.setString(i++, q.name); +            stmt.setLong(i, q.interval); +            stmt.executeUpdate(); +        } +    } + +    public void update(Queue q) throws SQLException { +        try (PreparedStatement stmt = connection.prepareStatement("UPDATE queue SET interval=? WHERE name=?")) { +            int i = 1; +            stmt.setLong(i++, q.interval); +            stmt.setString(i, q.name); +            stmt.executeUpdate(); +        } +    } + +    public Queue mapRow(ResultSet rs) throws SQLException { +        return new Queue(rs.getString(1), rs.getLong(2)); +    } +} diff --git a/src/main/java/io/trygvis/queue/QueueExecutor.java b/src/main/java/io/trygvis/queue/QueueExecutor.java new file mode 100644 index 0000000..88e5b46 --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueExecutor.java @@ -0,0 +1,177 @@ +package io.trygvis.queue; + +import io.trygvis.async.QueueStats; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Date; +import java.util.List; + +import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.*; +import static io.trygvis.queue.Task.TaskState.NEW; + +public class QueueExecutor { +    private final Logger log = LoggerFactory.getLogger(getClass()); + +    private final QueueSystem queueSystem; + +    private final SqlEffectExecutor sqlEffectExecutor; + +    public final Queue queue; + +    private final Stats stats = new Stats(); + +    public enum TaskExecutionResult { +        OK, +        FAILED, +        MISSED +    } + +    public QueueExecutor(QueueSystem queueSystem, SqlEffectExecutor sqlEffectExecutor, Queue queue) { +        this.queueSystem = queueSystem; +        this.sqlEffectExecutor = sqlEffectExecutor; +        this.queue = queue; +    } + +    private static class Stats { +        public int total; +        public int ok; +        public int failed; +        public int scheduled; +        public int missed; + +        public synchronized QueueStats toStats() { +            return new QueueStats(total, ok, failed, missed, scheduled); +        } +    } + +    public QueueStats getStats() { +        return stats.toStats(); +    } + +    public void consumeAll(final QueueService.TaskExecutionRequest req, final TaskEffect effect) throws SQLException { +        log.info("Consuming tasks: request={}", req); + +        List<Task> tasks; +        do { +            tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() { +                @Override +                public List<Task> doInConnection(Connection c) throws SQLException { +                    return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, req.chunkSize); +                } +            }); + +            log.info("Consuming chunk with {} tasks", tasks.size()); + +            applyTasks(req, effect, tasks); +        } while (tasks.size() > 0); +    } + +    public void applyTasks(QueueService.TaskExecutionRequest req, TaskEffect effect, List<Task> tasks) { +        for (Task task : tasks) { +            TaskExecutionResult result = applyTask(effect, task); + +            if (result == FAILED && req.stopOnError) { +                throw new RuntimeException("Error while executing task, id=" + task.id()); +            } +        } +    } + +    /** +     * Executed each task in its own transaction. +     * <p/> +     * If the task fails, the status is set to error in a separate transaction. +     */ +    public TaskExecutionResult applyTask(TaskEffect effect, final Task task) { +        try { +            Integer count = sqlEffectExecutor.transaction(new SqlEffect<Integer>() { +                @Override +                public Integer doInConnection(Connection c) throws SQLException { +                    return queueSystem.createTaskDao(c).update(task.markProcessing(), NEW); +                } +            }); + +            if (count == 0) { +                log.warn("Missed task {}", task.id()); +                synchronized (stats) { +                    stats.missed++; +                } +                return MISSED; +            } + +            log.info("Executing task {}", task.id()); + +            final List<Task> newTasks = effect.apply(task); + +            final Date now = new Date(); + +            log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size()); + +            sqlEffectExecutor.transaction(new SqlEffect.Void() { +                @Override +                public void doInConnection(Connection c) throws SQLException { +                    for (Task newTask : newTasks) { +                        schedule(c, newTask); +                    } + +                    queueSystem.createTaskDao(c).update(task.markOk(now)); +                } +            }); + +            synchronized (stats) { +                stats.total++; +                stats.ok++; +            } + +            return OK; +        } catch (Exception e) { +            final Date now = new Date(); +            log.error("Unable to execute task, id=" + task.id(), e); + +            synchronized (stats) { +                stats.total++; +                stats.failed++; +            } + +            try { +                sqlEffectExecutor.transaction(new SqlEffect.Void() { +                    @Override +                    public void doInConnection(Connection c) throws SQLException { +                        TaskDao taskDao = queueSystem.createTaskDao(c); +                        taskDao.update(task.markFailed(now)); +                    } +                }); +            } catch (SQLException e1) { +                log.error("Error while marking task as failed.", e1); +            } + +            return FAILED; +        } +    } + +    public void schedule(Connection c, Task task) throws SQLException { +        schedule(c, task.queue, task.parent, task.scheduled, task.arguments); +    } + +    public Task schedule(Connection c, Date scheduled, List<String> arguments) throws SQLException { +        return schedule(c, queue.name, null, scheduled, arguments); +    } + +    public Task schedule(Connection c, long parent, Date scheduled, List<String> arguments) throws SQLException { +        return schedule(c, queue.name, parent, scheduled, arguments); +    } + +    private Task schedule(Connection c, String queue, Long parent, Date scheduled, List<String> arguments) throws SQLException { +        TaskDao taskDao = queueSystem.createTaskDao(c); + +        long id = taskDao.insert(parent, queue, NEW, scheduled, arguments); + +        synchronized (stats) { +            stats.scheduled++; +        } + +        return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments); +    } +} diff --git a/src/main/java/io/trygvis/queue/QueueService.java b/src/main/java/io/trygvis/queue/QueueService.java new file mode 100644 index 0000000..1c38f1f --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueService.java @@ -0,0 +1,54 @@ +package io.trygvis.queue; + +import java.sql.SQLException; +import java.util.Date; +import java.util.List; + +public interface QueueService { +    QueueExecutor getQueue(String name, int interval, boolean autoCreate) throws SQLException; + +    void schedule(Queue queue, Date scheduled, List<String> arguments) throws SQLException; + +    public static class TaskExecutionRequest { +        public final long chunkSize; +        public final boolean stopOnError; +        public final Long interval; +        public final boolean continueOnFullChunk; +        // TODO: saveExceptions + +        public TaskExecutionRequest(long chunkSize, boolean stopOnError) { +            this(chunkSize, stopOnError, null, true); +        } + +        private TaskExecutionRequest(long chunkSize, boolean stopOnError, Long interval, boolean continueOnFullChunk) { +            this.chunkSize = chunkSize; +            this.stopOnError = stopOnError; +            this.interval = interval; +            this.continueOnFullChunk = continueOnFullChunk; +        } + +        public TaskExecutionRequest interval(long interval) { +            return new TaskExecutionRequest(chunkSize, stopOnError, interval, continueOnFullChunk); +        } + +        public TaskExecutionRequest continueOnFullChunk(boolean continueOnFullChunk) { +            return new TaskExecutionRequest(chunkSize, stopOnError, interval, continueOnFullChunk); +        } + +        @Override +        public String toString() { +            return "TaskExecutionRequest{" + +                    "chunkSize=" + chunkSize + +                    ", stopOnError=" + stopOnError + +                    '}'; +        } + +        public long interval(Queue queue) { +            if (interval != null) { +                return interval; +            } + +            return queue.interval; +        } +    } +} diff --git a/src/main/java/io/trygvis/queue/QueueStats.java b/src/main/java/io/trygvis/queue/QueueStats.java new file mode 100644 index 0000000..5b048b3 --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueStats.java @@ -0,0 +1,20 @@ +package io.trygvis.queue; + +import java.util.EnumMap; + +public class QueueStats { +    public final String name; +    public final long totalTaskCount; +    public final EnumMap<Task.TaskState, Long> states; + +    public QueueStats(String name, EnumMap<Task.TaskState, Long> states) { +        this.name = name; +        this.states = states; + +        long c = 0; +        for (Long l : states.values()) { +            c += l; +        } +        this.totalTaskCount = c; +    } +} diff --git a/src/main/java/io/trygvis/queue/QueueSystem.java b/src/main/java/io/trygvis/queue/QueueSystem.java new file mode 100644 index 0000000..5954526 --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueSystem.java @@ -0,0 +1,61 @@ +package io.trygvis.queue; + +import io.trygvis.async.JdbcAsyncService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.SQLException; + +public class QueueSystem { +    private final Logger log = LoggerFactory.getLogger(getClass()); + +    public final SqlEffectExecutor sqlEffectExecutor; + +    private final JdbcQueueService queueService; + +    private QueueSystem(SqlEffectExecutor sqlEffectExecutor) throws SQLException { +        sqlEffectExecutor.transaction(new SqlEffect.Void() { +            @Override +            public void doInConnection(Connection c) throws SQLException { +                if (c.getAutoCommit()) { +                    throw new SQLException("The connection cannot be in auto-commit mode."); +                } + +                DatabaseMetaData metaData = c.getMetaData(); +                String productName = metaData.getDatabaseProductName(); +                String productVersion = metaData.getDatabaseProductVersion(); + +                log.info("productName = " + productName); +                log.info("productVersion = " + productVersion); +            } +        }); + +        this.sqlEffectExecutor = sqlEffectExecutor; +        queueService = new JdbcQueueService(this); +    } + +    /** +     * Initializes the queue system. Use this as the first thing do as it will validate the database. +     */ +    public static QueueSystem initialize(SqlEffectExecutor sqlEffectExecutor) throws SQLException { +        return new QueueSystem(sqlEffectExecutor); +    } + +    public JdbcQueueService createQueueService() { +        return queueService; +    } + +    public QueueDao createQueueDao(Connection c) { +        return new QueueDao(c); +    } + +    public TaskDao createTaskDao(Connection c) { +        return new TaskDao(c); +    } + +    public JdbcAsyncService createAsyncService() { +        return new JdbcAsyncService(this); +    } +} diff --git a/src/main/java/io/trygvis/queue/SqlEffect.java b/src/main/java/io/trygvis/queue/SqlEffect.java new file mode 100644 index 0000000..7864bcd --- /dev/null +++ b/src/main/java/io/trygvis/queue/SqlEffect.java @@ -0,0 +1,12 @@ +package io.trygvis.queue; + +import java.sql.Connection; +import java.sql.SQLException; + +public interface SqlEffect<A> { +    A doInConnection(Connection c) throws SQLException; + +    interface Void { +        void doInConnection(Connection c) throws SQLException; +    } +} diff --git a/src/main/java/io/trygvis/queue/SqlEffectExecutor.java b/src/main/java/io/trygvis/queue/SqlEffectExecutor.java new file mode 100644 index 0000000..92802da --- /dev/null +++ b/src/main/java/io/trygvis/queue/SqlEffectExecutor.java @@ -0,0 +1,50 @@ +package io.trygvis.queue; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; + +public class SqlEffectExecutor { + +    private final DataSource dataSource; + +    public SqlEffectExecutor(DataSource dataSource) { +        this.dataSource = dataSource; +    } + +    public <A> A transaction(SqlEffect<A> effect) throws SQLException { +//        int pid; + +        try (Connection c = dataSource.getConnection()) { +//            pid = getPid(c); +//            System.out.println("pid = " + pid); + +            boolean ok = false; +            try { +                A a = effect.doInConnection(c); +                c.commit(); +                ok = true; +                return a; +            } finally { +//                System.out.println("Closing, pid = " + pid); +                if (!ok) { +                    try { +                        c.rollback(); +                    } catch (SQLException e) { +                        // ignore +                    } +                } +            } +        } +    } + +    public void transaction(final SqlEffect.Void effect) throws SQLException { +        transaction(new SqlEffect<Object>() { +            @Override +            public Object doInConnection(Connection c) throws SQLException { +                effect.doInConnection(c); +                return null; +            } +        }); +    } +} diff --git a/src/main/java/io/trygvis/queue/Task.java b/src/main/java/io/trygvis/queue/Task.java new file mode 100755 index 0000000..8038050 --- /dev/null +++ b/src/main/java/io/trygvis/queue/Task.java @@ -0,0 +1,117 @@ +package io.trygvis.queue; + +import java.util.Date; +import java.util.List; + +import static io.trygvis.queue.Task.TaskState.*; +import static java.util.Arrays.asList; + +public class Task { + +    public enum TaskState { +        NEW, +        PROCESSING, +        OK, +        FAILED +    } + +    private final long id; + +    public final Long parent; + +    public final String queue; + +    public final TaskState state; + +    public final Date scheduled; + +    public final Date lastRun; + +    public final int runCount; + +    public final Date completed; + +    public final List<String> arguments; + +    public Task(long id, Long parent, String queue, TaskState state, Date scheduled, Date lastRun, int runCount, Date completed, List<String> arguments) { +        this.id = id; +        this.parent = parent; +        this.queue = queue; +        this.state = state; +        this.scheduled = scheduled; +        this.lastRun = lastRun; +        this.runCount = runCount; +        this.completed = completed; + +        this.arguments = arguments; +    } + +    public Task markProcessing() { +        return new Task(id, parent, queue, PROCESSING, scheduled, new Date(), runCount + 1, completed, arguments); +    } + +    public Task markOk(Date completed) { +        return new Task(id, parent, queue, OK, scheduled, lastRun, runCount, completed, arguments); +    } + +    public Task markFailed(Date now) { +        return new Task(id, parent, queue, FAILED, scheduled, lastRun, runCount, completed, arguments); +    } + +    public String toString() { +        return "Task{" + +                "id=" + id + +                ", parent=" + parent + +                ", queue=" + queue + +                ", state=" + state + +                ", scheduled=" + scheduled + +                ", lastRun=" + lastRun + +                ", runCount=" + runCount + +                ", completed=" + completed + +                ", arguments='" + arguments + '\'' + +                '}'; +    } + +    public long id() { +        if (id == 0) { +            throw new RuntimeException("This task has not been persisted yet."); +        } + +        return id; +    } + +    public boolean isDone() { +        return completed != null; +    } + +    public Task childTask(String queue, Date scheduled, String... arguments) { +        return new Task(0, id(), queue, NEW, scheduled, null, 0, null, asList(arguments)); +    } + +    public static Task newTask(String queue, Date scheduled, String... arguments) { +        return new Task(0, null, queue, NEW, scheduled, null, 0, null, asList(arguments)); +    } + +    public static Task newTask(String queue, Date scheduled, List<String> arguments) { +        return new Task(0, null, queue, NEW, scheduled, null, 0, null, arguments); +    } + +    public static List<String> stringToArguments(String arguments) { +        return asList(arguments.split(",")); +    } + +    public static String argumentsToString(List<String> arguments) { +        StringBuilder builder = new StringBuilder(); +        for (int i = 0, argumentsLength = arguments.size(); i < argumentsLength; i++) { +            String argument = arguments.get(i); +            if (argument.contains(",")) { +                throw new RuntimeException("The argument string can't contain a comma."); +            } +            if (i > 0) { +                builder.append(','); +            } +            builder.append(argument); +        } +        return builder.toString(); +    } +} diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java new file mode 100644 index 0000000..365b44b --- /dev/null +++ b/src/main/java/io/trygvis/queue/TaskDao.java @@ -0,0 +1,148 @@ +package io.trygvis.queue; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.EnumMap; +import java.util.List; + +import static io.trygvis.queue.Task.*; + +public class TaskDao { + +    private final Connection c; + +    public static final String fields = "id, parent, queue, state, scheduled, last_run, run_count, completed, arguments"; + +    TaskDao(Connection c) { +        this.c = c; +    } + +    public long insert(Long parent, String queue, TaskState state, Date scheduled, List<String> arguments) throws SQLException { +        String sql = "INSERT INTO task(id, parent, run_count, queue, state, scheduled, arguments) " + +                "VALUES(nextval('task_seq'), ?, 0, ?, ?, ?, ?)"; +        try (PreparedStatement stmt = c.prepareStatement(sql)) { +            int i = 1; +            if (parent == null) { +                stmt.setNull(i++, Types.BIGINT); +            } else { +                stmt.setLong(i++, parent); +            } +            stmt.setString(i++, queue); +            stmt.setString(i++, state.name()); +            stmt.setTimestamp(i++, new Timestamp(scheduled.getTime())); +            stmt.setString(i, argumentsToString(arguments)); +            stmt.executeUpdate(); +        } +        try (PreparedStatement stmt = c.prepareStatement("SELECT currval('task_seq')")) { +            ResultSet rs = stmt.executeQuery(); +            rs.next(); +            return rs.getLong(1); +        } +    } + +    public Task findById(long id) throws SQLException { +        try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE id=?")) { +            stmt.setLong(1, id); +            ResultSet rs = stmt.executeQuery(); +            return rs.next() ? mapRow(rs) : null; +        } +    } + +    public List<Task> findByQueueAndState(String queue, TaskState state, long limit) throws SQLException { +        try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND state=? LIMIT ?")) { +            int i = 1; +            stmt.setString(i++, queue); +            stmt.setString(i++, state.name()); +            stmt.setLong(i, limit); +            ResultSet rs = stmt.executeQuery(); +            List<Task> list = new ArrayList<>(); +            while (rs.next()) { +                list.add(mapRow(rs)); +            } +            return list; +        } +    } + +    public QueueStats findQueueStatsByName(String queue) throws SQLException { +        try (PreparedStatement stmt = c.prepareStatement("SELECT state, COUNT(id) FROM task WHERE queue=? GROUP BY state")) { +            int i = 1; +            stmt.setString(i, queue); +            ResultSet rs = stmt.executeQuery(); +            EnumMap<TaskState, Long> states = new EnumMap<>(TaskState.class); +            while (rs.next()) { +                states.put(TaskState.valueOf(rs.getString(1)), rs.getLong(2)); +            } +            return new QueueStats(queue, states); +        } +    } + +    public int update(Task task) throws SQLException { +        return update(task, null); +    } + +    public int update(Task task, TaskState state) throws SQLException { +        String sql = "UPDATE task SET state=?, scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?"; + +        if (state != null) { +            sql += " AND state=?"; +        } + +        try (PreparedStatement stmt = c.prepareStatement(sql)) { +            int i = 1; +            stmt.setString(i++, task.state.name()); +            stmt.setTimestamp(i++, new Timestamp(task.scheduled.getTime())); +            setTimestamp(stmt, i++, task.lastRun); +            stmt.setInt(i++, task.runCount); +            setTimestamp(stmt, i++, task.completed); +            stmt.setLong(i++, task.id()); +            if (state != null) { +                stmt.setString(i, state.name()); +            } +            return stmt.executeUpdate(); +        } +    } + +    public void setState(Task task, TaskState state) throws SQLException { +        try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=? WHERE id = ?")) { +            int i = 1; +            stmt.setString(i++, state.name()); +            stmt.setLong(i, task.id()); +            stmt.executeUpdate(); +        } +    } + +    private static void setTimestamp(PreparedStatement stmt, int parameterIndex, Date date) throws SQLException { +        if (date == null) { +            stmt.setNull(parameterIndex, Types.TIMESTAMP); +        } else { +            stmt.setTimestamp(parameterIndex, new Timestamp(date.getTime())); +        } +    } + +    public Task mapRow(ResultSet rs) throws SQLException { +        String arguments = rs.getString(9); +        int i = 1; +        return new Task( +                rs.getLong(i++), +                rs.getLong(i++), +                rs.getString(i++), +                TaskState.valueOf(rs.getString(i++)), +                rs.getTimestamp(i++), +                rs.getTimestamp(i++), +                rs.getInt(i++), +                rs.getTimestamp(i), +                arguments != null ? stringToArguments(arguments) : Collections.<String>emptyList()); +    } + +    public void rollback() throws SQLException { +        c.rollback(); +        c.close(); +    } +} diff --git a/src/main/java/io/trygvis/queue/TaskEffect.java b/src/main/java/io/trygvis/queue/TaskEffect.java new file mode 100644 index 0000000..186797f --- /dev/null +++ b/src/main/java/io/trygvis/queue/TaskEffect.java @@ -0,0 +1,7 @@ +package io.trygvis.queue; + +import java.util.List; + +public interface TaskEffect { +    List<Task> apply(Task task) throws Exception; +} diff --git a/src/main/java/io/trygvis/spring/DefaultConfig.java b/src/main/java/io/trygvis/spring/DefaultConfig.java new file mode 100644 index 0000000..6890d58 --- /dev/null +++ b/src/main/java/io/trygvis/spring/DefaultConfig.java @@ -0,0 +1,31 @@ +package io.trygvis.spring; + +import io.trygvis.async.AsyncService; +import io.trygvis.queue.SqlEffectExecutor; +import io.trygvis.queue.QueueService; +import io.trygvis.queue.QueueSystem; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.JdbcTemplate; + +import javax.sql.DataSource; +import java.sql.SQLException; + +@Configuration +public class DefaultConfig { + +    @Bean +    public QueueSystem queueSystem(DataSource ds) throws SQLException { +        return QueueSystem.initialize(new SqlEffectExecutor(ds)); +    } + +    @Bean +    public AsyncService asyncService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) { +        return new SpringJdbcAsyncService(queueSystem, jdbcTemplate); +    } + +    @Bean +    public QueueService queueService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) { +        return new SpringQueueService(queueSystem, jdbcTemplate); +    } +} diff --git a/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java b/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java new file mode 100644 index 0000000..12dc71e --- /dev/null +++ b/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java @@ -0,0 +1,110 @@ +package io.trygvis.spring; + +import io.trygvis.async.AsyncService; +import io.trygvis.async.JdbcAsyncService; +import io.trygvis.async.QueueController; +import io.trygvis.queue.SqlEffect; +import io.trygvis.queue.JdbcQueueService; +import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueService; +import io.trygvis.queue.QueueSystem; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jdbc.core.ConnectionCallback; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionSynchronization; +import org.springframework.transaction.support.TransactionSynchronizationAdapter; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Date; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static org.springframework.transaction.annotation.Propagation.REQUIRED; +import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization; + +public class SpringJdbcAsyncService implements AsyncService { +    private final Logger log = LoggerFactory.getLogger(getClass()); + +    private final ScheduledThreadPoolExecutor executor; + +    private final JdbcTemplate jdbcTemplate; + +    private final JdbcAsyncService jdbcAsyncService; + +    private final JdbcQueueService queueService; + +    private final QueueSystem queueSystem; + +    public SpringJdbcAsyncService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) { +        this.queueSystem = queueSystem; +        this.jdbcTemplate = jdbcTemplate; +        jdbcAsyncService = new JdbcAsyncService(queueSystem); +        queueService = queueSystem.createQueueService(); +        executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); +    } + +    @Transactional(propagation = REQUIRED) +    public QueueController registerQueue(final Queue queue, final QueueService.TaskExecutionRequest req, final TaskEffect processor) throws SQLException { +        QueueExecutor queueExecutor = queueSystem.sqlEffectExecutor.transaction(new SqlEffect<QueueExecutor>() { +            @Override +            public QueueExecutor doInConnection(Connection c) throws SQLException { +                return queueService.lookupQueue(c, queue.name, queue.interval, true); +            } +        }); + +        final QueueController queueController = jdbcAsyncService.registerQueue(queueExecutor, req, processor); + +        registerSynchronization(new TransactionSynchronizationAdapter() { +            public void afterCompletion(int status) { +                log.info("Transaction completed with status = {}", status); +                if (status == TransactionSynchronization.STATUS_COMMITTED) { +                    queueController.start(executor); +                } +            } +        }); + +        return queueController; +    } + +    public QueueExecutor getQueue(String name) { +        return jdbcAsyncService.getQueue(name); +    } + +    @Transactional(propagation = REQUIRED) +    public Task schedule(final Queue queue, final Date scheduled, final List<String> args) { +        return jdbcTemplate.execute(new ConnectionCallback<Task>() { +            @Override +            public Task doInConnection(Connection c) throws SQLException { +                QueueExecutor queueExecutor = queueService.getQueue(queue.name); +                return queueExecutor.schedule(c, scheduled, args); +            } +        }); +    } + +    public Task schedule(final Queue queue, final long parent, final Date scheduled, final List<String> args) { +        return jdbcTemplate.execute(new ConnectionCallback<Task>() { +            @Override +            public Task doInConnection(Connection c) throws SQLException { +                QueueExecutor queueExecutor = queueService.getQueue(queue.name); +                return queueExecutor.schedule(c, parent, scheduled, args); +            } +        }); +    } + +    @Transactional(readOnly = true) +    public Task update(final Task ref) { +        return jdbcTemplate.execute(new ConnectionCallback<Task>() { +            @Override +            public Task doInConnection(Connection c) throws SQLException { +                return jdbcAsyncService.update(c, ref); +            } +        }); +    } +} diff --git a/src/main/java/io/trygvis/spring/SpringQueueService.java b/src/main/java/io/trygvis/spring/SpringQueueService.java new file mode 100644 index 0000000..2027ab5 --- /dev/null +++ b/src/main/java/io/trygvis/spring/SpringQueueService.java @@ -0,0 +1,49 @@ +package io.trygvis.spring; + +import io.trygvis.queue.JdbcQueueService; +import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueService; +import io.trygvis.queue.QueueSystem; +import org.springframework.dao.DataAccessException; +import org.springframework.jdbc.core.ConnectionCallback; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.transaction.annotation.Transactional; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Date; +import java.util.List; + +public class SpringQueueService implements QueueService { + +    public final JdbcTemplate jdbcTemplate; + +    public JdbcQueueService queueService; + +    public SpringQueueService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) { +        this.jdbcTemplate = jdbcTemplate; +        this.queueService = queueSystem.createQueueService(); +    } + +    @Transactional +    public QueueExecutor getQueue(final String name, final int interval, final boolean autoCreate) throws SQLException { +        return jdbcTemplate.execute(new ConnectionCallback<QueueExecutor>() { +            @Override +            public QueueExecutor doInConnection(Connection c) throws SQLException, DataAccessException { +                return queueService.lookupQueue(c, name, interval, autoCreate); +            } +        }); +    } + +    @Transactional +    public void schedule(final Queue queue, final Date scheduled, final List<String> arguments) throws SQLException { +        jdbcTemplate.execute(new ConnectionCallback<Object>() { +            @Override +            public Object doInConnection(Connection c) throws SQLException, DataAccessException { +                queueService.getQueue(queue.name).schedule(c, scheduled, arguments); +                return null; +            } +        }); +    } +} | 
