diff options
| author | Trygve Laugstøl <trygvis@inamo.no> | 2013-06-02 12:32:29 +0200 | 
|---|---|---|
| committer | Trygve Laugstøl <trygvis@inamo.no> | 2013-06-02 12:32:29 +0200 | 
| commit | 52084f7b4e6f50c90b3255cdf2eb9deab560c970 (patch) | |
| tree | eed9abd7fe9825aaacfd4fe24c8fd363cc41fed1 /src | |
| parent | 7d704feb86c44fca57941d223e8605b55fcf68f0 (diff) | |
| download | quartz-based-queue-52084f7b4e6f50c90b3255cdf2eb9deab560c970.tar.gz quartz-based-queue-52084f7b4e6f50c90b3255cdf2eb9deab560c970.tar.bz2 quartz-based-queue-52084f7b4e6f50c90b3255cdf2eb9deab560c970.tar.xz quartz-based-queue-52084f7b4e6f50c90b3255cdf2eb9deab560c970.zip | |
o Making some test cases.
Diffstat (limited to 'src')
17 files changed, 444 insertions, 178 deletions
| diff --git a/src/main/java/io/trygvis/async/AsyncService.java b/src/main/java/io/trygvis/async/AsyncService.java index e90a0e4..57c1af8 100755 --- a/src/main/java/io/trygvis/async/AsyncService.java +++ b/src/main/java/io/trygvis/async/AsyncService.java @@ -2,8 +2,8 @@ package io.trygvis.async;  import io.trygvis.queue.Queue;  import io.trygvis.queue.Task; -import org.quartz.SchedulerException; +import java.sql.SQLException;  import java.util.List;  /** @@ -16,9 +16,8 @@ public interface AsyncService {       * @param interval how often the queue should be polled for missed tasks in seconds.       * @param callable       * @return -     * @throws SchedulerException       */ -    Queue registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException; +    Queue registerQueue(final String name, final int interval, AsyncCallable callable);      Queue getQueue(String name); diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java index 4e78a37..c34330e 100644 --- a/src/main/java/io/trygvis/async/JdbcAsyncService.java +++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java @@ -4,88 +4,66 @@ import io.trygvis.queue.Queue;  import io.trygvis.queue.QueueDao;  import io.trygvis.queue.Task;  import io.trygvis.queue.TaskDao; -import org.quartz.SchedulerException;  import org.slf4j.Logger;  import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Transactional; -import org.springframework.transaction.support.TransactionSynchronization; -import org.springframework.transaction.support.TransactionSynchronizationAdapter; -import org.springframework.transaction.support.TransactionTemplate; +import java.sql.Connection; +import java.sql.SQLException;  import java.util.Date;  import java.util.HashMap;  import java.util.Map; -import java.util.concurrent.Executors;  import java.util.concurrent.ScheduledThreadPoolExecutor;  import static java.lang.System.currentTimeMillis;  import static java.lang.Thread.sleep;  import static java.util.Arrays.asList;  import static java.util.concurrent.TimeUnit.SECONDS; -import static org.springframework.transaction.annotation.Propagation.REQUIRED; -import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization; -@Component -public class JdbcAsyncService implements AsyncService { +public class JdbcAsyncService {      private final Logger log = LoggerFactory.getLogger(getClass()); -    private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); -      private final Map<String, QueueThread> queues = new HashMap<>(); -    @Autowired -    private TransactionTemplate transactionTemplate; - -    @Autowired -    private QueueDao queueDao; - -    @Autowired -    private TaskDao taskDao; +    public Queue registerQueue(Connection c, SqlEffectExecutor sqlEffectExecutor, final String name, final int interval, AsyncService.AsyncCallable callable) throws SQLException { +        QueueDao queueDao = new QueueDao(c); -    @Transactional(propagation = REQUIRED) -    public Queue registerQueue(final String name, final int interval, AsyncCallable callable) throws SchedulerException {          log.info("registerQueue: ENTER");          Queue q = queueDao.findByName(name);          log.info("q = {}", q); -        final long interval_;          if (q == null) {              q = new Queue(name, interval);              queueDao.insert(q); -            interval_ = interval; -        } else { -            // Found an existing queue. Use the Settings from the database. -            interval_ = q.interval;          } -        final QueueThread queueThread = new QueueThread(q, taskDao, transactionTemplate, callable); +        final QueueThread queueThread = new QueueThread(sqlEffectExecutor, callable, q);          queues.put(name, queueThread); -        registerSynchronization(new TransactionSynchronizationAdapter() { -            public void afterCompletion(int status) { -                log.info("Transaction completed with status = {}", status); -                if (status == TransactionSynchronization.STATUS_COMMITTED) { -                    log.info("Starting thread for queue {} with poll interval = {}s", name, interval); -                    executor.scheduleAtFixedRate(new Runnable() { -                        public void run() { -                            queueThread.ping(); -                        } -                    }, 10, interval_, SECONDS); -                    Thread thread = new Thread(queueThread, name); -                    thread.setDaemon(true); -                    thread.start(); -                } -            } -        }); -          log.info("registerQueue: LEAVE");          return q;      } +    public void startQueue(ScheduledThreadPoolExecutor executor, String name) { +        final QueueThread queueThread = queues.get(name); + +        if (queueThread == null) { +            throw new RuntimeException("No such queue: " + name); +        } + +        long interval = queueThread.queue.interval; +        log.info("Starting thread for queue {} with poll interval = {}s", name, interval); +        executor.scheduleAtFixedRate(new Runnable() { +            public void run() { +                queueThread.ping(); +            } +        }, 10, interval, SECONDS); +        Thread thread = new Thread(queueThread, name); +        thread.setDaemon(true); +        thread.start(); +    } +      public Queue getQueue(String name) {          QueueThread queueThread = queues.get(name); @@ -96,16 +74,17 @@ public class JdbcAsyncService implements AsyncService {          return queueThread.queue;      } -    @Transactional(propagation = REQUIRED) -    public Task schedule(final Queue queue, String... args) { -        return scheduleInner(null, queue, args); +    public Task schedule(Connection c, final Queue queue, String... args) throws SQLException { +        return scheduleInner(c, null, queue, args);      } -    public Task schedule(long parent, Queue queue, String... args) { -        return scheduleInner(parent, queue, args); +    public Task schedule(Connection c, long parent, Queue queue, String... args) throws SQLException { +        return scheduleInner(c, parent, queue, args);      } -    private Task scheduleInner(Long parent, final Queue queue, String... args) { +    private Task scheduleInner(Connection c, Long parent, final Queue queue, String... args) throws SQLException { +        TaskDao taskDao = new TaskDao(c); +          Date scheduled = new Date();          StringBuilder arguments = new StringBuilder(); @@ -114,27 +93,22 @@ public class JdbcAsyncService implements AsyncService {          }          long id = taskDao.insert(parent, queue.name, scheduled, arguments.toString()); -        Task task = new Task(parent, id, queue.name, scheduled, null, 0, null, asList(args)); +        Task task = new Task(id, parent, queue.name, scheduled, null, 0, null, asList(args));          log.info("Created task = {}", task); -        registerSynchronization(new TransactionSynchronizationAdapter() { -            public void afterCompletion(int status) { -                if (status == TransactionSynchronization.STATUS_COMMITTED) { -                    queues.get(queue.name).ping(); -                } -            } -        }); -          return task;      } -    @Transactional -    public Task await(Task task, long timeout) { +    public Task await(Connection c, Task task, long timeout) throws SQLException {          final long start = currentTimeMillis();          final long end = start + timeout;          while (currentTimeMillis() < end) { -            task = update(task); +            task = update(c, task); + +            if (task == null) { +                throw new RuntimeException("The task went away."); +            }              try {                  sleep(100); @@ -146,8 +120,9 @@ public class JdbcAsyncService implements AsyncService {          return task;      } -    @Transactional(readOnly = true) -    public Task update(Task ref) { +    public Task update(Connection c, Task ref) throws SQLException { +        TaskDao taskDao = new TaskDao(c); +          return taskDao.findById(ref.id);      }  } diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java index 69466df..00c46b4 100644 --- a/src/main/java/io/trygvis/async/QueueThread.java +++ b/src/main/java/io/trygvis/async/QueueThread.java @@ -5,37 +5,33 @@ import io.trygvis.queue.Task;  import io.trygvis.queue.TaskDao;  import org.slf4j.Logger;  import org.slf4j.LoggerFactory; -import org.springframework.transaction.TransactionException; -import org.springframework.transaction.TransactionStatus; -import org.springframework.transaction.support.TransactionCallback; -import org.springframework.transaction.support.TransactionCallbackWithoutResult; -import org.springframework.transaction.support.TransactionTemplate; +import java.sql.Connection; +import java.sql.SQLException;  import java.util.Date;  import java.util.List; +import static io.trygvis.async.SqlEffectExecutor.SqlExecutionException; +  class QueueThread implements Runnable {      private final Logger log = LoggerFactory.getLogger(getClass()); -    public boolean shouldRun = true; - -    private boolean checkForNewTasks; +    private final SqlEffectExecutor sqlEffectExecutor; -    private boolean busy; +    private final AsyncService.AsyncCallable callable;      public final Queue queue; -    private final TaskDao taskDao; +    public boolean shouldRun = true; -    private final TransactionTemplate transactionTemplate; +    private boolean checkForNewTasks; -    private final AsyncService.AsyncCallable callable; +    private boolean busy; -    QueueThread(Queue queue, TaskDao taskDao, TransactionTemplate transactionTemplate, AsyncService.AsyncCallable callable) { -        this.queue = queue; -        this.taskDao = taskDao; -        this.transactionTemplate = transactionTemplate; +    QueueThread(SqlEffectExecutor sqlEffectExecutor, AsyncService.AsyncCallable callable, Queue queue) { +        this.sqlEffectExecutor = sqlEffectExecutor;          this.callable = callable; +        this.queue = queue;      }      public void ping() { @@ -52,19 +48,25 @@ class QueueThread implements Runnable {      public void run() {          while (shouldRun) {              try { -                List<Task> tasks = transactionTemplate.execute(new TransactionCallback<List<Task>>() { -                    public List<Task> doInTransaction(TransactionStatus status) { -                        return taskDao.findByNameAndCompletedIsNull(queue.name); +//                List<Task> tasks = transactionTemplate.execute(new TransactionCallback<List<Task>>() { +//                    public List<Task> doInTransaction(TransactionStatus status) { +//                        return taskDao.findByNameAndCompletedIsNull(queue.name); +//                    } +//                }); +                List<Task> tasks = sqlEffectExecutor.execute(new SqlEffect<List<Task>>() { +                    @Override +                    public List<Task> doInConnection(Connection connection) throws SQLException { +                        return new TaskDao(connection).findByNameAndCompletedIsNull(queue.name);                      }                  });                  log.info("Found {} tasks on queue {}", tasks.size(), queue.name); -                if(tasks.size() > 0) { +                if (tasks.size() > 0) {                      for (final Task task : tasks) {                          try {                              executeTask(task); -                        } catch (TransactionException | TaskFailureException e) { +                        } catch (SqlExecutionException | TaskFailureException e) {                              log.warn("Task execution failed", e);                          }                      } @@ -96,24 +98,44 @@ class QueueThread implements Runnable {      private void executeTask(final Task task) {          final Date run = new Date();          log.info("Setting last run on task. date = {}, task = {}", run, task); -        transactionTemplate.execute(new TransactionCallbackWithoutResult() { -            protected void doInTransactionWithoutResult(TransactionStatus status) { -                taskDao.update(task.registerRun()); +        sqlEffectExecutor.execute(new SqlEffect.Void() { +            @Override +            public void doInConnection(Connection connection) throws SQLException { +                new TaskDao(connection).update(task.registerRun());              }          }); +//        transactionTemplate.execute(new TransactionCallbackWithoutResult() { +//            protected void doInTransactionWithoutResult(TransactionStatus status) { +//                taskDao.update(task.registerRun()); +//            } +//        }); -        transactionTemplate.execute(new TransactionCallbackWithoutResult() { -            protected void doInTransactionWithoutResult(TransactionStatus status) { +        sqlEffectExecutor.execute(new SqlEffect.Void() { +            @Override +            public void doInConnection(Connection c) throws SQLException {                  try {                      callable.run(task.arguments);                      Date completed = new Date();                      Task t = task.registerComplete(completed);                      log.info("Completed task: {}", t); -                    taskDao.update(t); +                    new TaskDao(c).update(t);                  } catch (Exception e) {                      throw new TaskFailureException(e);                  }              }          }); +//        transactionTemplate.execute(new TransactionCallbackWithoutResult() { +//            protected void doInTransactionWithoutResult(TransactionStatus status) { +//                try { +//                    callable.run(task.arguments); +//                    Date completed = new Date(); +//                    Task t = task.registerComplete(completed); +//                    log.info("Completed task: {}", t); +//                    taskDao.update(t); +//                } catch (Exception e) { +//                    throw new TaskFailureException(e); +//                } +//            } +//        });      }  } diff --git a/src/main/java/io/trygvis/async/SqlEffect.java b/src/main/java/io/trygvis/async/SqlEffect.java new file mode 100644 index 0000000..d0c4e9b --- /dev/null +++ b/src/main/java/io/trygvis/async/SqlEffect.java @@ -0,0 +1,12 @@ +package io.trygvis.async; + +import java.sql.Connection; +import java.sql.SQLException; + +public interface SqlEffect<A> { +    A doInConnection(Connection c) throws SQLException; + +    interface Void { +        void doInConnection(Connection c) throws SQLException; +    } +} diff --git a/src/main/java/io/trygvis/async/SqlEffectExecutor.java b/src/main/java/io/trygvis/async/SqlEffectExecutor.java new file mode 100644 index 0000000..c8abbd3 --- /dev/null +++ b/src/main/java/io/trygvis/async/SqlEffectExecutor.java @@ -0,0 +1,39 @@ +package io.trygvis.async; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; + +public class SqlEffectExecutor { + +    private final DataSource dataSource; + +    public SqlEffectExecutor(DataSource dataSource) { +        this.dataSource = dataSource; +    } + +    public <A> A execute(SqlEffect<A> effect) { +        try (Connection c = dataSource.getConnection()) { +            return effect.doInConnection(c); +        } catch (SQLException e) { +            throw new SqlExecutionException(e); +        } +    } + +    public void execute(SqlEffect.Void effect) { +        try (Connection c = dataSource.getConnection()) { +            effect.doInConnection(c); +        } catch (SQLException e) { +            throw new SqlExecutionException(e); +        } +    } + +    public static class SqlExecutionException extends RuntimeException { +        public final SQLException exception; + +        public SqlExecutionException(SQLException ex) { +            super(ex); +            this.exception = ex; +        } +    } +} diff --git a/src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java b/src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java new file mode 100644 index 0000000..8517c68 --- /dev/null +++ b/src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java @@ -0,0 +1,102 @@ +package io.trygvis.async.spring; + +import io.trygvis.async.AsyncService; +import io.trygvis.async.JdbcAsyncService; +import io.trygvis.async.SqlEffectExecutor; +import io.trygvis.queue.Queue; +import io.trygvis.queue.Task; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.core.ConnectionCallback; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionSynchronization; +import org.springframework.transaction.support.TransactionSynchronizationAdapter; +import org.springframework.transaction.support.TransactionTemplate; + +import javax.annotation.PostConstruct; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static org.springframework.transaction.annotation.Propagation.REQUIRED; +import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization; + +public class SpringJdbcAsyncService implements AsyncService { +    private final Logger log = LoggerFactory.getLogger(getClass()); + +    private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); + +    private final TransactionTemplate transactionTemplate; + +    private final JdbcTemplate jdbcTemplate; + +    private SqlEffectExecutor sqlEffectExecutor; + +    final JdbcAsyncService jdbcAsyncService; + +    public SpringJdbcAsyncService(TransactionTemplate transactionTemplate, JdbcTemplate jdbcTemplate) { +        this.transactionTemplate = transactionTemplate; +        this.jdbcTemplate = jdbcTemplate; +        jdbcAsyncService = new JdbcAsyncService(); +        sqlEffectExecutor = new SqlEffectExecutor(this.jdbcTemplate.getDataSource()); +    } + +    @Transactional(propagation = REQUIRED) +    public Queue registerQueue(final String name, final int interval, final AsyncService.AsyncCallable callable) { +        return jdbcTemplate.execute(new ConnectionCallback<Queue>() { +            @Override +            public Queue doInConnection(Connection c) throws SQLException { + +                Queue q = jdbcAsyncService.registerQueue(c, sqlEffectExecutor, name, interval, callable); + +                registerSynchronization(new TransactionSynchronizationAdapter() { +                    public void afterCompletion(int status) { +                        log.info("Transaction completed with status = {}", status); +                        if (status == TransactionSynchronization.STATUS_COMMITTED) { +                            jdbcAsyncService.startQueue(executor, name); +                        } +                    } +                }); + +                log.info("registerQueue: LEAVE"); +                return q; +            } +        }); +    } + +    public Queue getQueue(String name) { +        return jdbcAsyncService.getQueue(name); +    } + +    @Transactional(propagation = REQUIRED) +    public Task schedule(final Queue queue, final String... args) { +        return jdbcTemplate.execute(new ConnectionCallback<Task>() { +            @Override +            public Task doInConnection(Connection c) throws SQLException { +                return jdbcAsyncService.schedule(c, queue, args); +            } +        }); +    } + +    public Task schedule(final long parent, final Queue queue, final String... args) { +        return jdbcTemplate.execute(new ConnectionCallback<Task>() { +            @Override +            public Task doInConnection(Connection c) throws SQLException { +                return jdbcAsyncService.schedule(c, parent, queue, args); +            } +        }); +    } + +    @Transactional(readOnly = true) +    public Task update(final Task ref) { +        return jdbcTemplate.execute(new ConnectionCallback<Task>() { +            @Override +            public Task doInConnection(Connection c) throws SQLException { +                return jdbcAsyncService.update(c, ref); +            } +        }); +    } +} diff --git a/src/main/java/io/trygvis/queue/QueueDao.java b/src/main/java/io/trygvis/queue/QueueDao.java index 63dde2a..2f69e11 100644 --- a/src/main/java/io/trygvis/queue/QueueDao.java +++ b/src/main/java/io/trygvis/queue/QueueDao.java @@ -1,36 +1,45 @@  package io.trygvis.queue; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.jdbc.core.RowMapper; -import org.springframework.stereotype.Component; - +import java.sql.Connection; +import java.sql.PreparedStatement;  import java.sql.ResultSet;  import java.sql.SQLException; -import static org.springframework.dao.support.DataAccessUtils.singleResult; - -@Component  public class QueueDao { -    @Autowired -    private JdbcTemplate jdbcTemplate; +    private final Connection connection; -    public Queue findByName(String name) { -        return singleResult(jdbcTemplate.query("SELECT name, interval FROM queue WHERE name=?", new QueueRowMapper(), name)); +    public QueueDao(Connection connection) { +        this.connection = connection;      } -    public void insert(Queue q) { -        jdbcTemplate.update("INSERT INTO queue(name, interval) VALUES(?, ?)", q.name, q.interval); +    public Queue findByName(String name) throws SQLException { +        try (PreparedStatement stmt = connection.prepareStatement("SELECT name, interval FROM queue WHERE name=?")) { +            stmt.setString(1, name); +            ResultSet rs = stmt.executeQuery(); +            return rs.next() ? mapRow(rs) : null; +        }      } -    public void update(Queue q) { -        jdbcTemplate.update("UPDATE queue SET interval=? WHERE name=?", q.interval, q.name); +    public void insert(Queue q) throws SQLException { +        try (PreparedStatement stmt = connection.prepareStatement("INSERT INTO queue(name, interval) VALUES(?, ?)")) { +            int i = 1; +            stmt.setString(i++, q.name); +            stmt.setLong(i, q.interval); +            stmt.executeUpdate(); +        }      } -    private class QueueRowMapper implements RowMapper<Queue> { -        public Queue mapRow(ResultSet rs, int rowNum) throws SQLException { -            return new Queue(rs.getString(1), rs.getLong(2)); +    public void update(Queue q) throws SQLException { +        try (PreparedStatement stmt = connection.prepareStatement("UPDATE queue SET interval=? WHERE name=?")) { +            int i = 1; +            stmt.setLong(i++, q.interval); +            stmt.setString(i, q.name); +            stmt.executeUpdate();          }      } + +    public Queue mapRow(ResultSet rs) throws SQLException { +        return new Queue(rs.getString(1), rs.getLong(2)); +    }  } diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java index a59dcbb..5459933 100644 --- a/src/main/java/io/trygvis/queue/TaskDao.java +++ b/src/main/java/io/trygvis/queue/TaskDao.java @@ -1,69 +1,104 @@  package io.trygvis.queue; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.jdbc.core.RowMapper; -import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Transactional; - +import java.sql.Connection; +import java.sql.PreparedStatement;  import java.sql.ResultSet;  import java.sql.SQLException; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.ArrayList;  import java.util.Collections;  import java.util.Date;  import java.util.List;  import static java.util.Arrays.asList; -import static org.springframework.transaction.annotation.Propagation.MANDATORY; -@Component  public class TaskDao { -    @Autowired -    private JdbcTemplate jdbcTemplate; +    private final Connection connection; + +    public static final String fields = "id, parent, queue, scheduled, last_run, run_count, completed, arguments"; -    public long insert(String queue, Date scheduled, String arguments) { -        return this.insert(null, queue, scheduled, arguments); +    public TaskDao(Connection connection) { +        this.connection = connection;      } -    @Transactional(propagation = MANDATORY) -    public long insert(Long parent, String queue, Date scheduled, String arguments) { -        jdbcTemplate.update("INSERT INTO task(id, parent, run_count, queue, scheduled, arguments) " + -                "VALUES(nextval('task_seq'), ?, 0, ?, ?, ?)", parent, queue, scheduled, arguments); -        return jdbcTemplate.queryForObject("SELECT currval('task_seq')", Long.class); +    public long insert(String queue, Date scheduled, String arguments) throws SQLException { +        return insert(null, queue, scheduled, arguments);      } -    @Transactional(propagation = MANDATORY) -    public Task findById(long id) { -        return jdbcTemplate.queryForObject("SELECT " + TaskRowMapper.fields + " FROM task WHERE id=?", -                new TaskRowMapper(), id); +    public long insert(Long parent, String queue, Date scheduled, String arguments) throws SQLException { +        String sql = "INSERT INTO task(id, parent, run_count, queue, scheduled, arguments) " + +                "VALUES(nextval('task_seq'), ?, 0, ?, ?, ?)"; +        try (PreparedStatement stmt = connection.prepareStatement(sql)) { +            int i = 1; +            if (parent == null) { +                stmt.setNull(i++, Types.BIGINT); +            } else { +                stmt.setLong(i++, parent); +            } +            stmt.setString(i++, queue); +            stmt.setTimestamp(i++, new Timestamp(scheduled.getTime())); +            stmt.setString(i, arguments); +            stmt.executeUpdate(); +        } +        try (PreparedStatement stmt = connection.prepareStatement("SELECT currval('task_seq')")) { +            ResultSet rs = stmt.executeQuery(); +            rs.next(); +            return rs.getLong(1); +        }      } -    @Transactional(propagation = MANDATORY) -    public List<Task> findByNameAndCompletedIsNull(String name) { -        return jdbcTemplate.query("SELECT " + TaskRowMapper.fields + " FROM task WHERE queue=? AND completed IS NULL", -                new TaskRowMapper(), name); +    public Task findById(long id) throws SQLException { +        try (PreparedStatement stmt = connection.prepareStatement("SELECT " + fields + " FROM task WHERE id=?")) { +            ResultSet rs = stmt.executeQuery(); +            return rs.next() ? mapRow(rs) : null; +        }      } -    @Transactional(propagation = MANDATORY) -    public void update(Task task) { -        jdbcTemplate.update("UPDATE task SET scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?", -                task.scheduled, task.lastRun, task.runCount, task.completed, task.id); +    public List<Task> findByNameAndCompletedIsNull(String name) throws SQLException { +        try (PreparedStatement stmt = connection.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND completed IS NULL")) { +            int i = 1; +            stmt.setString(i, name); +            ResultSet rs = stmt.executeQuery(); +            List<Task> list = new ArrayList<>(); +            while (rs.next()) { +                list.add(mapRow(rs)); +            } +            return list; +        }      } -    private class TaskRowMapper implements RowMapper<Task> { -        public static final String fields = "id, parent, queue, scheduled, last_run, run_count, completed, arguments"; +    public void update(Task task) throws SQLException { +        try (PreparedStatement stmt = connection.prepareStatement("UPDATE task SET scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?")) { +            int i = 1; +            stmt.setTimestamp(i++, new Timestamp(task.scheduled.getTime())); +            setTimestamp(stmt, i++, task.lastRun); +            stmt.setInt(i++, task.runCount); +            setTimestamp(stmt, i++, task.completed); +            stmt.setLong(i, task.id); +            stmt.executeUpdate(); +        } +    } -        public Task mapRow(ResultSet rs, int rowNum) throws SQLException { -            String arguments = rs.getString(8); -            return new Task( -                    rs.getLong(1), -                    rs.getLong(2), -                    rs.getString(3), -                    rs.getTimestamp(4), -                    rs.getTimestamp(5), -                    rs.getInt(6), -                    rs.getTimestamp(7), -                    arguments != null ? asList(arguments.split(" ")) : Collections.<String>emptyList()); +    private static void setTimestamp(PreparedStatement stmt, int parameterIndex, Date date) throws SQLException { +        if (date == null) { +            stmt.setNull(parameterIndex, Types.TIMESTAMP); +        } else { +            stmt.setTimestamp(parameterIndex, new Timestamp(date.getTime()));          }      } + +    public Task mapRow(ResultSet rs) throws SQLException { +        String arguments = rs.getString(8); +        return new Task( +                rs.getLong(1), +                rs.getLong(2), +                rs.getString(3), +                rs.getTimestamp(4), +                rs.getTimestamp(5), +                rs.getInt(6), +                rs.getTimestamp(7), +                arguments != null ? asList(arguments.split(" ")) : Collections.<String>emptyList()); +    }  } diff --git a/src/main/java/io/trygvis/spring/DefaultConfig.java b/src/main/java/io/trygvis/spring/DefaultConfig.java new file mode 100644 index 0000000..af8f644 --- /dev/null +++ b/src/main/java/io/trygvis/spring/DefaultConfig.java @@ -0,0 +1,17 @@ +package io.trygvis.spring; + +import io.trygvis.async.AsyncService; +import io.trygvis.async.spring.SpringJdbcAsyncService; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.transaction.support.TransactionTemplate; + +@Configuration +public class DefaultConfig { + +    @Bean +    public AsyncService asyncService(TransactionTemplate transactionTemplate, JdbcTemplate jdbcTemplate) { +        return new SpringJdbcAsyncService(transactionTemplate, jdbcTemplate); +    } +} diff --git a/src/main/resources/create.sql b/src/main/resources/create-postgresql.sql index f7f2939..39672f0 100644 --- a/src/main/resources/create.sql +++ b/src/main/resources/create-postgresql.sql @@ -2,7 +2,7 @@ BEGIN;  DROP TABLE IF EXISTS task;  DROP TABLE IF EXISTS queue; -DROP SEQUENCE IF EXISTS task_id; +DROP SEQUENCE IF EXISTS task_seq;  CREATE TABLE queue (    name     VARCHAR(100) NOT NULL, @@ -12,7 +12,7 @@ CREATE TABLE queue (  CREATE TABLE task (    id        BIGINT       NOT NULL, -  parent    BIGINT       NOT NULL, +  parent    BIGINT,    queue     VARCHAR(100) NOT NULL,    scheduled TIMESTAMP    NOT NULL,    last_run  TIMESTAMP, @@ -24,6 +24,6 @@ CREATE TABLE task (    CONSTRAINT fk_task__parent FOREIGN KEY (parent) REFERENCES task (id)  ); -CREATE SEQUENCE task_id; +CREATE SEQUENCE task_seq;  COMMIT; diff --git a/src/main/java/io/trygvis/model/Article.java b/src/test/java/io/trygvis/test/Article.java index e86c570..d4f54ce 100755 --- a/src/main/java/io/trygvis/model/Article.java +++ b/src/test/java/io/trygvis/test/Article.java @@ -1,4 +1,4 @@ -package io.trygvis.model; +package io.trygvis.test;  import javax.persistence.Entity;  import javax.persistence.GeneratedValue; diff --git a/src/main/java/io/trygvis/CreateArticleCallable.java b/src/test/java/io/trygvis/test/CreateArticleCallable.java index 471b59d..f68cd5b 100755 --- a/src/main/java/io/trygvis/CreateArticleCallable.java +++ b/src/test/java/io/trygvis/test/CreateArticleCallable.java @@ -1,6 +1,5 @@ -package io.trygvis; +package io.trygvis.test; -import io.trygvis.model.Article;  import io.trygvis.async.AsyncService;  import org.slf4j.Logger;  import org.slf4j.LoggerFactory; diff --git a/src/main/java/io/trygvis/Main.java b/src/test/java/io/trygvis/test/Main.java index 08b9b75..721df61 100755 --- a/src/main/java/io/trygvis/Main.java +++ b/src/test/java/io/trygvis/test/Main.java @@ -1,4 +1,4 @@ -package io.trygvis; +package io.trygvis.test;  import io.trygvis.async.AsyncService;  import io.trygvis.queue.Queue; @@ -11,8 +11,6 @@ import org.springframework.beans.factory.annotation.Autowired;  import org.springframework.beans.factory.annotation.Qualifier;  import org.springframework.context.support.ClassPathXmlApplicationContext;  import org.springframework.stereotype.Component; -import org.springframework.transaction.TransactionStatus; -import org.springframework.transaction.support.TransactionCallbackWithoutResult;  import org.springframework.transaction.support.TransactionTemplate;  import java.util.ArrayList; @@ -83,13 +81,13 @@ public class Main {          final int count = 1;          log.info("Creating {} tasks", count); -        transactionTemplate.execute(new TransactionCallbackWithoutResult() { -            protected void doInTransactionWithoutResult(TransactionStatus status) { -                for (int i = 0; i < count; i++) { -                    tasks.add(asyncService.schedule(q)); -                } -            } -        }); +//        transactionTemplate.execute(new TransactionCallbackWithoutResult() { +//            protected void doInTransactionWithoutResult(TransactionStatus status) { +//                for (int i = 0; i < count; i++) { +//                    tasks.add(asyncService.schedule(q)); +//                } +//            } +//        });          log.info("Created {} tasks", count);          while (true) { diff --git a/src/main/java/io/trygvis/UpdateArticleCallable.java b/src/test/java/io/trygvis/test/UpdateArticleCallable.java index a910855..aae28b9 100755 --- a/src/main/java/io/trygvis/UpdateArticleCallable.java +++ b/src/test/java/io/trygvis/test/UpdateArticleCallable.java @@ -1,4 +1,4 @@ -package io.trygvis; +package io.trygvis.test;  import io.trygvis.async.AsyncService;  import org.slf4j.Logger; diff --git a/src/test/java/io/trygvis/test/spring/PlainSpringTest.java b/src/test/java/io/trygvis/test/spring/PlainSpringTest.java new file mode 100644 index 0000000..9a7a436 --- /dev/null +++ b/src/test/java/io/trygvis/test/spring/PlainSpringTest.java @@ -0,0 +1,59 @@ +package io.trygvis.test.spring; + +import io.trygvis.async.AsyncService; +import io.trygvis.queue.Queue; +import io.trygvis.spring.DefaultConfig; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import java.sql.SQLException; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import static java.lang.System.getProperty; +import static java.lang.System.setProperty; +import static org.fest.assertions.Assertions.assertThat; +import static org.junit.Assert.assertNotNull; + +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(classes = {TestConfig.class, DefaultConfig.class}) +public class PlainSpringTest { + +    @Autowired +    private AsyncService asyncService; + +    static { +        String username = getProperty("user.name"); +        setProperty("database.url", getProperty("jdbc.url", "jdbc:postgresql://localhost/" + username)); +        setProperty("database.username", username); +        setProperty("database.password", username); +    } + +    @Test +    public void testBasic() throws SQLException, InterruptedException { +        final AtomicReference<List<String>> ref = new AtomicReference<>(); +        Queue test = asyncService.registerQueue("test", 10, new AsyncService.AsyncCallable() { +            public void run(List<String> arguments) throws Exception { +                System.out.println("PlainSpringTest.run"); +                ref.set(arguments); +                synchronized (ref) { +                    ref.notify(); +                } +            } +        }); + +        synchronized (ref) { +            System.out.println("Scheduling task"); +            asyncService.schedule(test, "hello", "world"); +            System.out.println("Waiting"); +            ref.wait(1000); +        } + +        List<String> args = ref.get(); +        assertNotNull(args); +        assertThat(args).containsExactly("hello", "world"); +    } +} diff --git a/src/main/java/io/trygvis/spring/Config.java b/src/test/java/io/trygvis/test/spring/TestConfig.java index df4b2e2..7853cb5 100755 --- a/src/main/java/io/trygvis/spring/Config.java +++ b/src/test/java/io/trygvis/test/spring/TestConfig.java @@ -1,4 +1,4 @@ -package io.trygvis.spring; +package io.trygvis.test.spring;  import com.jolbox.bonecp.BoneCPDataSource;  import org.springframework.beans.factory.annotation.Value; @@ -20,7 +20,7 @@ import javax.sql.DataSource;  @ComponentScan(basePackages = "io.trygvis")  @EnableTransactionManagement  //@EnableJpaRepositories(basePackages = "io.trygvis.data") -public class Config { +public class TestConfig {      @Bean      public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() throws Exception { diff --git a/src/main/resources/applicationContext.xml b/src/test/resources/applicationContext.xml index 2d33d8c..5f173b3 100755 --- a/src/main/resources/applicationContext.xml +++ b/src/test/resources/applicationContext.xml @@ -17,6 +17,6 @@         http://www.springframework.org/schema/data/jpa http://www.springframework.org/schema/data/jpa/spring-jpa.xsd">    <context:annotation-config/> -  <bean class="io.trygvis.spring.Config"/> +  <bean class="io.trygvis.test.spring.TestConfig"/>  </beans> | 
