From 7465fdb9aa847d29dacc56adbe473f1c1ceb298e Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Tue, 4 Jun 2013 20:54:56 +0200 Subject: o Creating a QueueService on top of the DAOs. --- pom.xml | 2 +- src/main/java/io/trygvis/async/AsyncService.java | 13 +-- .../java/io/trygvis/async/JdbcAsyncService.java | 39 +++----- .../async/spring/SpringJdbcAsyncService.java | 100 --------------------- .../java/io/trygvis/queue/JdbcQueueService.java | 91 +++++++++++++++++++ src/main/java/io/trygvis/queue/QueueService.java | 17 ++++ src/main/java/io/trygvis/queue/Task.java | 39 +++++++- src/main/java/io/trygvis/queue/TaskDao.java | 14 +-- src/main/java/io/trygvis/spring/DefaultConfig.java | 12 ++- .../io/trygvis/spring/SpringJdbcAsyncService.java | 90 +++++++++++++++++++ .../java/io/trygvis/spring/SpringQueueService.java | 70 +++++++++++++++ src/test/java/io/trygvis/test/DbUtil.java | 47 ++++++++++ src/test/java/io/trygvis/test/Main.java | 8 +- .../java/io/trygvis/test/PlainJavaExample.java | 70 +++++++++++++++ .../io/trygvis/test/spring/PlainSpringTest.java | 10 ++- .../java/io/trygvis/test/spring/TestConfig.java | 1 + 16 files changed, 469 insertions(+), 154 deletions(-) delete mode 100644 src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java create mode 100644 src/main/java/io/trygvis/queue/JdbcQueueService.java create mode 100644 src/main/java/io/trygvis/queue/QueueService.java create mode 100644 src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java create mode 100644 src/main/java/io/trygvis/spring/SpringQueueService.java create mode 100644 src/test/java/io/trygvis/test/DbUtil.java create mode 100644 src/test/java/io/trygvis/test/PlainJavaExample.java diff --git a/pom.xml b/pom.xml index 2527f35..a86ce8d 100755 --- a/pom.xml +++ b/pom.xml @@ -58,7 +58,7 @@ com.jolbox - bonecp-spring + bonecp 0.7.1.RELEASE test diff --git a/src/main/java/io/trygvis/async/AsyncService.java b/src/main/java/io/trygvis/async/AsyncService.java index 57c1af8..17d53e9 100755 --- a/src/main/java/io/trygvis/async/AsyncService.java +++ b/src/main/java/io/trygvis/async/AsyncService.java @@ -3,7 +3,6 @@ package io.trygvis.async; import io.trygvis.queue.Queue; import io.trygvis.queue.Task; -import java.sql.SQLException; import java.util.List; /** @@ -11,19 +10,13 @@ import java.util.List; */ public interface AsyncService { - /** - * @param name - * @param interval how often the queue should be polled for missed tasks in seconds. - * @param callable - * @return - */ - Queue registerQueue(final String name, final int interval, AsyncCallable callable); + void registerQueue(Queue queue, final AsyncService.AsyncCallable callable); Queue getQueue(String name); - Task schedule(Queue queue, String... args); + Task schedule(Queue queue, List args); - Task schedule(long parent, Queue queue, String... args); + Task schedule(long parent, Queue queue, List args); /** * Polls for a new state of the execution. diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java index c34330e..310c59b 100644 --- a/src/main/java/io/trygvis/async/JdbcAsyncService.java +++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java @@ -1,7 +1,6 @@ package io.trygvis.async; import io.trygvis.queue.Queue; -import io.trygvis.queue.QueueDao; import io.trygvis.queue.Task; import io.trygvis.queue.TaskDao; import org.slf4j.Logger; @@ -11,12 +10,12 @@ import java.sql.Connection; import java.sql.SQLException; import java.util.Date; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledThreadPoolExecutor; import static java.lang.System.currentTimeMillis; import static java.lang.Thread.sleep; -import static java.util.Arrays.asList; import static java.util.concurrent.TimeUnit.SECONDS; public class JdbcAsyncService { @@ -24,25 +23,12 @@ public class JdbcAsyncService { private final Map queues = new HashMap<>(); - public Queue registerQueue(Connection c, SqlEffectExecutor sqlEffectExecutor, final String name, final int interval, AsyncService.AsyncCallable callable) throws SQLException { - QueueDao queueDao = new QueueDao(c); + public void registerQueue(SqlEffectExecutor sqlEffectExecutor, Queue queue, AsyncService.AsyncCallable callable) { + final QueueThread queueThread = new QueueThread(sqlEffectExecutor, callable, queue); - log.info("registerQueue: ENTER"); - - Queue q = queueDao.findByName(name); - - log.info("q = {}", q); - - if (q == null) { - q = new Queue(name, interval); - queueDao.insert(q); - } - - final QueueThread queueThread = new QueueThread(sqlEffectExecutor, callable, q); - queues.put(name, queueThread); + queues.put(queue.name, queueThread); log.info("registerQueue: LEAVE"); - return q; } public void startQueue(ScheduledThreadPoolExecutor executor, String name) { @@ -74,26 +60,21 @@ public class JdbcAsyncService { return queueThread.queue; } - public Task schedule(Connection c, final Queue queue, String... args) throws SQLException { + public Task schedule(Connection c, final Queue queue, List args) throws SQLException { return scheduleInner(c, null, queue, args); } - public Task schedule(Connection c, long parent, Queue queue, String... args) throws SQLException { + public Task schedule(Connection c, long parent, Queue queue, List args) throws SQLException { return scheduleInner(c, parent, queue, args); } - private Task scheduleInner(Connection c, Long parent, final Queue queue, String... args) throws SQLException { + private Task scheduleInner(Connection c, Long parent, final Queue queue, List args) throws SQLException { TaskDao taskDao = new TaskDao(c); Date scheduled = new Date(); - StringBuilder arguments = new StringBuilder(); - for (String arg : args) { - arguments.append(arg).append(' '); - } - - long id = taskDao.insert(parent, queue.name, scheduled, arguments.toString()); - Task task = new Task(id, parent, queue.name, scheduled, null, 0, null, asList(args)); + long id = taskDao.insert(parent, queue.name, scheduled, args); + Task task = new Task(id, parent, queue.name, scheduled, null, 0, null, args); log.info("Created task = {}", task); return task; @@ -123,6 +104,6 @@ public class JdbcAsyncService { public Task update(Connection c, Task ref) throws SQLException { TaskDao taskDao = new TaskDao(c); - return taskDao.findById(ref.id); + return taskDao.findById(ref.id()); } } diff --git a/src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java b/src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java deleted file mode 100644 index 327dffa..0000000 --- a/src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java +++ /dev/null @@ -1,100 +0,0 @@ -package io.trygvis.async.spring; - -import io.trygvis.async.AsyncService; -import io.trygvis.async.JdbcAsyncService; -import io.trygvis.async.SqlEffectExecutor; -import io.trygvis.queue.Queue; -import io.trygvis.queue.Task; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.jdbc.core.ConnectionCallback; -import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.transaction.annotation.Transactional; -import org.springframework.transaction.support.TransactionSynchronization; -import org.springframework.transaction.support.TransactionSynchronizationAdapter; -import org.springframework.transaction.support.TransactionTemplate; - -import java.sql.Connection; -import java.sql.SQLException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledThreadPoolExecutor; - -import static org.springframework.transaction.annotation.Propagation.REQUIRED; -import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization; - -public class SpringJdbcAsyncService implements AsyncService { - private final Logger log = LoggerFactory.getLogger(getClass()); - - private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); - - private final TransactionTemplate transactionTemplate; - - private final JdbcTemplate jdbcTemplate; - - private SqlEffectExecutor sqlEffectExecutor; - - final JdbcAsyncService jdbcAsyncService; - - public SpringJdbcAsyncService(TransactionTemplate transactionTemplate, JdbcTemplate jdbcTemplate) { - this.transactionTemplate = transactionTemplate; - this.jdbcTemplate = jdbcTemplate; - jdbcAsyncService = new JdbcAsyncService(); - sqlEffectExecutor = new SqlEffectExecutor(this.jdbcTemplate.getDataSource()); - } - - @Transactional(propagation = REQUIRED) - public Queue registerQueue(final String name, final int interval, final AsyncService.AsyncCallable callable) { - return jdbcTemplate.execute(new ConnectionCallback() { - @Override - public Queue doInConnection(Connection c) throws SQLException { - - Queue q = jdbcAsyncService.registerQueue(c, sqlEffectExecutor, name, interval, callable); - - registerSynchronization(new TransactionSynchronizationAdapter() { - public void afterCompletion(int status) { - log.info("Transaction completed with status = {}", status); - if (status == TransactionSynchronization.STATUS_COMMITTED) { - jdbcAsyncService.startQueue(executor, name); - } - } - }); - - log.info("registerQueue: LEAVE"); - return q; - } - }); - } - - public Queue getQueue(String name) { - return jdbcAsyncService.getQueue(name); - } - - @Transactional(propagation = REQUIRED) - public Task schedule(final Queue queue, final String... args) { - return jdbcTemplate.execute(new ConnectionCallback() { - @Override - public Task doInConnection(Connection c) throws SQLException { - return jdbcAsyncService.schedule(c, queue, args); - } - }); - } - - public Task schedule(final long parent, final Queue queue, final String... args) { - return jdbcTemplate.execute(new ConnectionCallback() { - @Override - public Task doInConnection(Connection c) throws SQLException { - return jdbcAsyncService.schedule(c, parent, queue, args); - } - }); - } - - @Transactional(readOnly = true) - public Task update(final Task ref) { - return jdbcTemplate.execute(new ConnectionCallback() { - @Override - public Task doInConnection(Connection c) throws SQLException { - return jdbcAsyncService.update(c, ref); - } - }); - } -} diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java new file mode 100644 index 0000000..793333d --- /dev/null +++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java @@ -0,0 +1,91 @@ +package io.trygvis.queue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.SQLException; +import java.util.Date; +import java.util.List; + +public class JdbcQueueService { + + private Logger log = LoggerFactory.getLogger(getClass()); + + private JdbcQueueService(Connection c) throws SQLException { + if (c.getAutoCommit()) { + throw new SQLException("The connection cannot be in auto-commit mode."); + } + + DatabaseMetaData metaData = c.getMetaData(); + String productName = metaData.getDatabaseProductName(); + String productVersion = metaData.getDatabaseProductVersion(); + + log.info("productName = " + productName); + log.info("productVersion = " + productVersion); + } + + public void consume(Connection c, Queue queue, QueueService.TaskEffect effect) throws SQLException { + TaskDao taskDao = createTaskDao(c); + + List tasks = taskDao.findByNameAndCompletedIsNull(queue.name); + log.trace("Got {} tasks.", tasks.size()); + + for (Task task : tasks) { + log.trace("Executing task {}", task.id()); + try { + List newTasks = effect.consume(task); + log.trace("Executed task {}, newTasks: ", task.id(), newTasks.size()); + + Date now = new Date(); + + task = task.registerComplete(now); + + for (Task newTask : newTasks) { + taskDao.insert(task.id(), newTask.queue, now, newTask.arguments); + } + + taskDao.update(task); + } catch (Throwable e) { + log.error("Unable to execute task, id=" + task.id(), e); + } + c.commit(); + } + } + + public Queue getQueue(Connection c, String name, int interval, boolean autoCreate) throws SQLException { + QueueDao queueDao = createQueueDao(c); + + Queue q = queueDao.findByName(name); + + if (q == null) { + if (!autoCreate) { + throw new SQLException("No such queue: '" + name + "'."); + } + + q = new Queue(name, interval); + queueDao.insert(q); + } + + return q; + } + + public void schedule(Connection c, Queue queue, Date scheduled, List arguments) throws SQLException { + TaskDao taskDao = createTaskDao(c); + + taskDao.insert(queue.name, scheduled, arguments); + } + + public static JdbcQueueService createQueueService(Connection c) throws SQLException { + return new JdbcQueueService(c); + } + + public QueueDao createQueueDao(Connection c) { + return new QueueDao(c); + } + + public TaskDao createTaskDao(Connection c) { + return new TaskDao(c); + } +} diff --git a/src/main/java/io/trygvis/queue/QueueService.java b/src/main/java/io/trygvis/queue/QueueService.java new file mode 100644 index 0000000..2111013 --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueService.java @@ -0,0 +1,17 @@ +package io.trygvis.queue; + +import java.sql.SQLException; +import java.util.Date; +import java.util.List; + +public interface QueueService { + void consume(Queue queue, TaskEffect effect) throws SQLException; + + Queue getQueue(String name, int interval, boolean autoCreate) throws SQLException; + + void schedule(Queue queue, Date scheduled, List arguments) throws SQLException; + + public static interface TaskEffect { + List consume(Task task) throws Exception; + } +} diff --git a/src/main/java/io/trygvis/queue/Task.java b/src/main/java/io/trygvis/queue/Task.java index 2b1103b..1af40d7 100755 --- a/src/main/java/io/trygvis/queue/Task.java +++ b/src/main/java/io/trygvis/queue/Task.java @@ -3,9 +3,11 @@ package io.trygvis.queue; import java.util.Date; import java.util.List; +import static java.util.Arrays.asList; + public class Task { - public final long id; + private final long id; public final Long parent; @@ -54,7 +56,42 @@ public class Task { '}'; } + public long id() { + if (id == 0) { + throw new RuntimeException("This task has not been persisted yet."); + } + + return id; + } + public boolean isDone() { return completed != null; } + + public static Task newTask(String name, Date scheduled, String... arguments) { + return new Task(0, 0l, name, scheduled, null, 0, null, asList(arguments)); + } + + public static Task newTask(String name, Date scheduled, List arguments) { + return new Task(0, 0l, name, scheduled, null, 0, null, arguments); + } + + public static List stringToArguments(String arguments) { + return asList(arguments.split(",")); + } + + public static String argumentsToString(List arguments) { + StringBuilder builder = new StringBuilder(); + for (int i = 0, argumentsLength = arguments.size(); i < argumentsLength; i++) { + String argument = arguments.get(i); + if (argument.contains(",")) { + throw new RuntimeException("The argument string can't contain a comma."); + } + if (i > 0) { + builder.append(','); + } + builder.append(argument); + } + return builder.toString(); + } } diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java index 5459933..3aa2ac2 100644 --- a/src/main/java/io/trygvis/queue/TaskDao.java +++ b/src/main/java/io/trygvis/queue/TaskDao.java @@ -11,7 +11,8 @@ import java.util.Collections; import java.util.Date; import java.util.List; -import static java.util.Arrays.asList; +import static io.trygvis.queue.Task.argumentsToString; +import static io.trygvis.queue.Task.stringToArguments; public class TaskDao { @@ -23,11 +24,11 @@ public class TaskDao { this.connection = connection; } - public long insert(String queue, Date scheduled, String arguments) throws SQLException { + public long insert(String queue, Date scheduled, List arguments) throws SQLException { return insert(null, queue, scheduled, arguments); } - public long insert(Long parent, String queue, Date scheduled, String arguments) throws SQLException { + public long insert(Long parent, String queue, Date scheduled, List arguments) throws SQLException { String sql = "INSERT INTO task(id, parent, run_count, queue, scheduled, arguments) " + "VALUES(nextval('task_seq'), ?, 0, ?, ?, ?)"; try (PreparedStatement stmt = connection.prepareStatement(sql)) { @@ -39,7 +40,7 @@ public class TaskDao { } stmt.setString(i++, queue); stmt.setTimestamp(i++, new Timestamp(scheduled.getTime())); - stmt.setString(i, arguments); + stmt.setString(i, argumentsToString(arguments)); stmt.executeUpdate(); } try (PreparedStatement stmt = connection.prepareStatement("SELECT currval('task_seq')")) { @@ -51,6 +52,7 @@ public class TaskDao { public Task findById(long id) throws SQLException { try (PreparedStatement stmt = connection.prepareStatement("SELECT " + fields + " FROM task WHERE id=?")) { + stmt.setLong(1, id); ResultSet rs = stmt.executeQuery(); return rs.next() ? mapRow(rs) : null; } @@ -76,7 +78,7 @@ public class TaskDao { setTimestamp(stmt, i++, task.lastRun); stmt.setInt(i++, task.runCount); setTimestamp(stmt, i++, task.completed); - stmt.setLong(i, task.id); + stmt.setLong(i, task.id()); stmt.executeUpdate(); } } @@ -99,6 +101,6 @@ public class TaskDao { rs.getTimestamp(5), rs.getInt(6), rs.getTimestamp(7), - arguments != null ? asList(arguments.split(" ")) : Collections.emptyList()); + arguments != null ? stringToArguments(arguments) : Collections.emptyList()); } } diff --git a/src/main/java/io/trygvis/spring/DefaultConfig.java b/src/main/java/io/trygvis/spring/DefaultConfig.java index af8f644..68761f2 100644 --- a/src/main/java/io/trygvis/spring/DefaultConfig.java +++ b/src/main/java/io/trygvis/spring/DefaultConfig.java @@ -1,17 +1,21 @@ package io.trygvis.spring; import io.trygvis.async.AsyncService; -import io.trygvis.async.spring.SpringJdbcAsyncService; +import io.trygvis.queue.QueueService; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.transaction.support.TransactionTemplate; @Configuration public class DefaultConfig { @Bean - public AsyncService asyncService(TransactionTemplate transactionTemplate, JdbcTemplate jdbcTemplate) { - return new SpringJdbcAsyncService(transactionTemplate, jdbcTemplate); + public AsyncService asyncService(JdbcTemplate jdbcTemplate) { + return new SpringJdbcAsyncService(jdbcTemplate); + } + + @Bean + public QueueService queueService(JdbcTemplate jdbcTemplate) { + return new SpringQueueService(jdbcTemplate); } } diff --git a/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java b/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java new file mode 100644 index 0000000..6702642 --- /dev/null +++ b/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java @@ -0,0 +1,90 @@ +package io.trygvis.spring; + +import io.trygvis.async.AsyncService; +import io.trygvis.async.JdbcAsyncService; +import io.trygvis.async.SqlEffectExecutor; +import io.trygvis.queue.Queue; +import io.trygvis.queue.Task; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jdbc.core.ConnectionCallback; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionSynchronization; +import org.springframework.transaction.support.TransactionSynchronizationAdapter; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static org.springframework.transaction.annotation.Propagation.REQUIRED; +import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization; + +public class SpringJdbcAsyncService implements AsyncService { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); + + private final JdbcTemplate jdbcTemplate; + + private final SqlEffectExecutor sqlEffectExecutor; + + private final JdbcAsyncService jdbcAsyncService; + + public SpringJdbcAsyncService(JdbcTemplate jdbcTemplate) { + this.jdbcTemplate = jdbcTemplate; + jdbcAsyncService = new JdbcAsyncService(); + sqlEffectExecutor = new SqlEffectExecutor(this.jdbcTemplate.getDataSource()); + } + + @Transactional(propagation = REQUIRED) + public void registerQueue(final Queue queue, final AsyncService.AsyncCallable callable) { + jdbcAsyncService.registerQueue(sqlEffectExecutor, queue, callable); + + registerSynchronization(new TransactionSynchronizationAdapter() { + public void afterCompletion(int status) { + log.info("Transaction completed with status = {}", status); + if (status == TransactionSynchronization.STATUS_COMMITTED) { + jdbcAsyncService.startQueue(executor, queue.name); + } + } + }); + + log.info("registerQueue: LEAVE"); + } + + public Queue getQueue(String name) { + return jdbcAsyncService.getQueue(name); + } + + @Transactional(propagation = REQUIRED) + public Task schedule(final Queue queue, final List args) { + return jdbcTemplate.execute(new ConnectionCallback() { + @Override + public Task doInConnection(Connection c) throws SQLException { + return jdbcAsyncService.schedule(c, queue, args); + } + }); + } + + public Task schedule(final long parent, final Queue queue, final List args) { + return jdbcTemplate.execute(new ConnectionCallback() { + @Override + public Task doInConnection(Connection c) throws SQLException { + return jdbcAsyncService.schedule(c, parent, queue, args); + } + }); + } + + @Transactional(readOnly = true) + public Task update(final Task ref) { + return jdbcTemplate.execute(new ConnectionCallback() { + @Override + public Task doInConnection(Connection c) throws SQLException { + return jdbcAsyncService.update(c, ref); + } + }); + } +} diff --git a/src/main/java/io/trygvis/spring/SpringQueueService.java b/src/main/java/io/trygvis/spring/SpringQueueService.java new file mode 100644 index 0000000..3432e35 --- /dev/null +++ b/src/main/java/io/trygvis/spring/SpringQueueService.java @@ -0,0 +1,70 @@ +package io.trygvis.spring; + +import io.trygvis.queue.JdbcQueueService; +import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueService; +import org.springframework.dao.DataAccessException; +import org.springframework.jdbc.core.ConnectionCallback; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.transaction.annotation.Transactional; + +import javax.annotation.PostConstruct; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Date; +import java.util.List; + +import static io.trygvis.queue.JdbcQueueService.createQueueService; + +public class SpringQueueService implements QueueService { + + public final JdbcTemplate jdbcTemplate; + + public JdbcQueueService queueService; + + public SpringQueueService(JdbcTemplate jdbcTemplate) { + this.jdbcTemplate = jdbcTemplate; + } + + @PostConstruct + public void postConstruct() { + queueService = jdbcTemplate.execute(new ConnectionCallback() { + @Override + public JdbcQueueService doInConnection(Connection c) throws SQLException, DataAccessException { + return createQueueService(c); + } + }); + } + + @Transactional + public void consume(final Queue queue, final TaskEffect effect) throws SQLException { + jdbcTemplate.execute(new ConnectionCallback() { + @Override + public Object doInConnection(Connection c) throws SQLException, DataAccessException { + queueService.consume(c, queue, effect); + return null; + } + }); + } + + @Transactional + public Queue getQueue(final String name, final int interval, final boolean autoCreate) throws SQLException { + return jdbcTemplate.execute(new ConnectionCallback() { + @Override + public Queue doInConnection(Connection c) throws SQLException, DataAccessException { + return queueService.getQueue(c, name, interval, autoCreate); + } + }); + } + + @Transactional + public void schedule(final Queue queue, final Date scheduled, final List arguments) throws SQLException { + jdbcTemplate.execute(new ConnectionCallback() { + @Override + public Object doInConnection(Connection c) throws SQLException, DataAccessException { + queueService.schedule(c, queue, scheduled, arguments); + return null; + } + }); + } +} diff --git a/src/test/java/io/trygvis/test/DbUtil.java b/src/test/java/io/trygvis/test/DbUtil.java new file mode 100644 index 0000000..d0e5b47 --- /dev/null +++ b/src/test/java/io/trygvis/test/DbUtil.java @@ -0,0 +1,47 @@ +package io.trygvis.test; + +import com.jolbox.bonecp.BoneCPDataSource; +import org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy; +import org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy; + +import javax.sql.DataSource; + +import java.io.PrintWriter; +import java.sql.SQLException; + +import static java.lang.System.getProperty; + +public class DbUtil { + public static DataSource createDataSource() throws SQLException { + String username = getProperty("user.name"); + String jdbcUrl = getProperty("database.url", "jdbc:postgresql://localhost/" + username); + String user = getProperty("database.username", username); + String pass = getProperty("database.password", username); + + return createDataSource(jdbcUrl, user, pass); + } + + public static DataSource createDataSource(String jdbcUrl, String username, String password) throws SQLException { + BoneCPDataSource ds = new BoneCPDataSource(); + + ds.setLogStatementsEnabled(true); + + ds.setJdbcUrl(jdbcUrl); + ds.setUsername(username); + ds.setPassword(password); + + ds.setDefaultAutoCommit(false); + ds.setIdleConnectionTestPeriodInSeconds(60); + ds.setIdleMaxAgeInSeconds(240); + ds.setMaxConnectionsPerPartition(40); + ds.setMinConnectionsPerPartition(0); + ds.setPartitionCount(1); + ds.setAcquireIncrement(1); + ds.setStatementsCacheSize(1000); + ds.setReleaseHelperThreads(3); + ds.setStatisticsEnabled(true); + ds.setLogStatementsEnabled(true); + ds.setLogWriter(new PrintWriter(System.err)); + return new TransactionAwareDataSourceProxy(new LazyConnectionDataSourceProxy(ds)); + } +} diff --git a/src/test/java/io/trygvis/test/Main.java b/src/test/java/io/trygvis/test/Main.java index d274101..43ee971 100755 --- a/src/test/java/io/trygvis/test/Main.java +++ b/src/test/java/io/trygvis/test/Main.java @@ -2,6 +2,7 @@ package io.trygvis.test; import io.trygvis.async.AsyncService; import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueService; import io.trygvis.queue.Task; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +57,9 @@ public class Main { @Autowired private AsyncService asyncService; + @Autowired + private QueueService queueService; + @Autowired @Qualifier("createArticle") private AsyncService.AsyncCallable createArticleCallable; @@ -67,7 +71,9 @@ public class Main { public void run() throws Exception { log.info("Main.run"); - final Queue q = asyncService.registerQueue("create-article", 1, createArticleCallable); + final Queue q = null; // queueService.getQueue(c, "create-article", 1); + + asyncService.registerQueue(q, createArticleCallable); // log.info("queue registered: ref = {}", q); // asyncService.registerQueue("update-queue", 1, updateArticleCallable); diff --git a/src/test/java/io/trygvis/test/PlainJavaExample.java b/src/test/java/io/trygvis/test/PlainJavaExample.java new file mode 100644 index 0000000..338abad --- /dev/null +++ b/src/test/java/io/trygvis/test/PlainJavaExample.java @@ -0,0 +1,70 @@ +package io.trygvis.test; + +import io.trygvis.queue.JdbcQueueService; +import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueService; +import io.trygvis.queue.Task; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.util.Date; +import java.util.List; + +import static io.trygvis.queue.JdbcQueueService.createQueueService; +import static io.trygvis.queue.Task.newTask; +import static io.trygvis.test.DbUtil.createDataSource; +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; + +public class PlainJavaExample { + + private static String inputName = "my-input"; + private static String outputName = "my-output"; + + private static int interval = 10; + + public static class Consumer { + public static void main(String[] args) throws Exception { + System.out.println("Starting consumer"); + + DataSource ds = createDataSource(); + Connection c = ds.getConnection(); + + JdbcQueueService queueService = createQueueService(c); + + final Queue input = queueService.getQueue(c, inputName, interval, true); + final Queue output = queueService.getQueue(c, outputName, interval, true); + + queueService.consume(c, input, new QueueService.TaskEffect() { + public List consume(Task task) throws Exception { + System.out.println("PlainJavaExample$Consumer.consume"); + Long a = Long.valueOf(task.arguments.get(0)); + Long b = Long.valueOf(task.arguments.get(1)); + + System.out.println("a + b = " + a + " + " + b + " = " + (a + b)); + + return singletonList(newTask(output.name, new Date(), Long.toString(a + b))); + } + }); + + c.commit(); + } + } + + public static class Producer { + public static void main(String[] args) throws Exception { + System.out.println("Starting producer"); + + DataSource ds = createDataSource(); + Connection c = ds.getConnection(); + + JdbcQueueService queueService = createQueueService(c); + + Queue queue = queueService.getQueue(c, inputName, interval, true); + + queueService.schedule(c, queue, new Date(), asList("10", "20")); + + c.commit(); + } + } +} diff --git a/src/test/java/io/trygvis/test/spring/PlainSpringTest.java b/src/test/java/io/trygvis/test/spring/PlainSpringTest.java index 9a7a436..07e67fb 100644 --- a/src/test/java/io/trygvis/test/spring/PlainSpringTest.java +++ b/src/test/java/io/trygvis/test/spring/PlainSpringTest.java @@ -2,6 +2,7 @@ package io.trygvis.test.spring; import io.trygvis.async.AsyncService; import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueService; import io.trygvis.spring.DefaultConfig; import org.junit.Test; import org.junit.runner.RunWith; @@ -15,6 +16,7 @@ import java.util.concurrent.atomic.AtomicReference; import static java.lang.System.getProperty; import static java.lang.System.setProperty; +import static java.util.Arrays.asList; import static org.fest.assertions.Assertions.assertThat; import static org.junit.Assert.assertNotNull; @@ -25,6 +27,9 @@ public class PlainSpringTest { @Autowired private AsyncService asyncService; + @Autowired + private QueueService queueService; + static { String username = getProperty("user.name"); setProperty("database.url", getProperty("jdbc.url", "jdbc:postgresql://localhost/" + username)); @@ -34,8 +39,9 @@ public class PlainSpringTest { @Test public void testBasic() throws SQLException, InterruptedException { + Queue test = queueService.getQueue("test", 10, true); final AtomicReference> ref = new AtomicReference<>(); - Queue test = asyncService.registerQueue("test", 10, new AsyncService.AsyncCallable() { + asyncService.registerQueue(test, new AsyncService.AsyncCallable() { public void run(List arguments) throws Exception { System.out.println("PlainSpringTest.run"); ref.set(arguments); @@ -47,7 +53,7 @@ public class PlainSpringTest { synchronized (ref) { System.out.println("Scheduling task"); - asyncService.schedule(test, "hello", "world"); + asyncService.schedule(test, asList("hello", "world")); System.out.println("Waiting"); ref.wait(1000); } diff --git a/src/test/java/io/trygvis/test/spring/TestConfig.java b/src/test/java/io/trygvis/test/spring/TestConfig.java index 7853cb5..a1d95c0 100755 --- a/src/test/java/io/trygvis/test/spring/TestConfig.java +++ b/src/test/java/io/trygvis/test/spring/TestConfig.java @@ -100,6 +100,7 @@ public class TestConfig { ds.setUsername(username); ds.setPassword(password); + ds.setDefaultAutoCommit(false); ds.setIdleConnectionTestPeriodInSeconds(60); ds.setIdleMaxAgeInSeconds(240); ds.setMaxConnectionsPerPartition(40); -- cgit v1.2.3