aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2013-06-04 20:54:56 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2013-06-04 20:54:56 +0200
commit7465fdb9aa847d29dacc56adbe473f1c1ceb298e (patch)
treed04b5c859fc090a57355e7bc0e51a043cddc907b
parent1eeef021c65c85c24d62a0cc1ee4a746a601beb5 (diff)
downloadquartz-based-queue-7465fdb9aa847d29dacc56adbe473f1c1ceb298e.tar.gz
quartz-based-queue-7465fdb9aa847d29dacc56adbe473f1c1ceb298e.tar.bz2
quartz-based-queue-7465fdb9aa847d29dacc56adbe473f1c1ceb298e.tar.xz
quartz-based-queue-7465fdb9aa847d29dacc56adbe473f1c1ceb298e.zip
o Creating a QueueService on top of the DAOs.
-rwxr-xr-xpom.xml2
-rwxr-xr-xsrc/main/java/io/trygvis/async/AsyncService.java13
-rw-r--r--src/main/java/io/trygvis/async/JdbcAsyncService.java39
-rw-r--r--src/main/java/io/trygvis/queue/JdbcQueueService.java91
-rw-r--r--src/main/java/io/trygvis/queue/QueueService.java17
-rwxr-xr-xsrc/main/java/io/trygvis/queue/Task.java39
-rw-r--r--src/main/java/io/trygvis/queue/TaskDao.java14
-rw-r--r--src/main/java/io/trygvis/spring/DefaultConfig.java12
-rw-r--r--src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java (renamed from src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java)46
-rw-r--r--src/main/java/io/trygvis/spring/SpringQueueService.java70
-rw-r--r--src/test/java/io/trygvis/test/DbUtil.java47
-rwxr-xr-xsrc/test/java/io/trygvis/test/Main.java8
-rw-r--r--src/test/java/io/trygvis/test/PlainJavaExample.java70
-rw-r--r--src/test/java/io/trygvis/test/spring/PlainSpringTest.java10
-rwxr-xr-xsrc/test/java/io/trygvis/test/spring/TestConfig.java1
15 files changed, 397 insertions, 82 deletions
diff --git a/pom.xml b/pom.xml
index 2527f35..a86ce8d 100755
--- a/pom.xml
+++ b/pom.xml
@@ -58,7 +58,7 @@
<dependency>
<groupId>com.jolbox</groupId>
- <artifactId>bonecp-spring</artifactId>
+ <artifactId>bonecp</artifactId>
<version>0.7.1.RELEASE</version>
<scope>test</scope>
</dependency>
diff --git a/src/main/java/io/trygvis/async/AsyncService.java b/src/main/java/io/trygvis/async/AsyncService.java
index 57c1af8..17d53e9 100755
--- a/src/main/java/io/trygvis/async/AsyncService.java
+++ b/src/main/java/io/trygvis/async/AsyncService.java
@@ -3,7 +3,6 @@ package io.trygvis.async;
import io.trygvis.queue.Queue;
import io.trygvis.queue.Task;
-import java.sql.SQLException;
import java.util.List;
/**
@@ -11,19 +10,13 @@ import java.util.List;
*/
public interface AsyncService {
- /**
- * @param name
- * @param interval how often the queue should be polled for missed tasks in seconds.
- * @param callable
- * @return
- */
- Queue registerQueue(final String name, final int interval, AsyncCallable callable);
+ void registerQueue(Queue queue, final AsyncService.AsyncCallable callable);
Queue getQueue(String name);
- Task schedule(Queue queue, String... args);
+ Task schedule(Queue queue, List<String> args);
- Task schedule(long parent, Queue queue, String... args);
+ Task schedule(long parent, Queue queue, List<String> args);
/**
* Polls for a new state of the execution.
diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java
index c34330e..310c59b 100644
--- a/src/main/java/io/trygvis/async/JdbcAsyncService.java
+++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java
@@ -1,7 +1,6 @@
package io.trygvis.async;
import io.trygvis.queue.Queue;
-import io.trygvis.queue.QueueDao;
import io.trygvis.queue.Task;
import io.trygvis.queue.TaskDao;
import org.slf4j.Logger;
@@ -11,12 +10,12 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.util.Date;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.sleep;
-import static java.util.Arrays.asList;
import static java.util.concurrent.TimeUnit.SECONDS;
public class JdbcAsyncService {
@@ -24,25 +23,12 @@ public class JdbcAsyncService {
private final Map<String, QueueThread> queues = new HashMap<>();
- public Queue registerQueue(Connection c, SqlEffectExecutor sqlEffectExecutor, final String name, final int interval, AsyncService.AsyncCallable callable) throws SQLException {
- QueueDao queueDao = new QueueDao(c);
+ public void registerQueue(SqlEffectExecutor sqlEffectExecutor, Queue queue, AsyncService.AsyncCallable callable) {
+ final QueueThread queueThread = new QueueThread(sqlEffectExecutor, callable, queue);
- log.info("registerQueue: ENTER");
-
- Queue q = queueDao.findByName(name);
-
- log.info("q = {}", q);
-
- if (q == null) {
- q = new Queue(name, interval);
- queueDao.insert(q);
- }
-
- final QueueThread queueThread = new QueueThread(sqlEffectExecutor, callable, q);
- queues.put(name, queueThread);
+ queues.put(queue.name, queueThread);
log.info("registerQueue: LEAVE");
- return q;
}
public void startQueue(ScheduledThreadPoolExecutor executor, String name) {
@@ -74,26 +60,21 @@ public class JdbcAsyncService {
return queueThread.queue;
}
- public Task schedule(Connection c, final Queue queue, String... args) throws SQLException {
+ public Task schedule(Connection c, final Queue queue, List<String> args) throws SQLException {
return scheduleInner(c, null, queue, args);
}
- public Task schedule(Connection c, long parent, Queue queue, String... args) throws SQLException {
+ public Task schedule(Connection c, long parent, Queue queue, List<String> args) throws SQLException {
return scheduleInner(c, parent, queue, args);
}
- private Task scheduleInner(Connection c, Long parent, final Queue queue, String... args) throws SQLException {
+ private Task scheduleInner(Connection c, Long parent, final Queue queue, List<String> args) throws SQLException {
TaskDao taskDao = new TaskDao(c);
Date scheduled = new Date();
- StringBuilder arguments = new StringBuilder();
- for (String arg : args) {
- arguments.append(arg).append(' ');
- }
-
- long id = taskDao.insert(parent, queue.name, scheduled, arguments.toString());
- Task task = new Task(id, parent, queue.name, scheduled, null, 0, null, asList(args));
+ long id = taskDao.insert(parent, queue.name, scheduled, args);
+ Task task = new Task(id, parent, queue.name, scheduled, null, 0, null, args);
log.info("Created task = {}", task);
return task;
@@ -123,6 +104,6 @@ public class JdbcAsyncService {
public Task update(Connection c, Task ref) throws SQLException {
TaskDao taskDao = new TaskDao(c);
- return taskDao.findById(ref.id);
+ return taskDao.findById(ref.id());
}
}
diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java
new file mode 100644
index 0000000..793333d
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java
@@ -0,0 +1,91 @@
+package io.trygvis.queue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.List;
+
+public class JdbcQueueService {
+
+ private Logger log = LoggerFactory.getLogger(getClass());
+
+ private JdbcQueueService(Connection c) throws SQLException {
+ if (c.getAutoCommit()) {
+ throw new SQLException("The connection cannot be in auto-commit mode.");
+ }
+
+ DatabaseMetaData metaData = c.getMetaData();
+ String productName = metaData.getDatabaseProductName();
+ String productVersion = metaData.getDatabaseProductVersion();
+
+ log.info("productName = " + productName);
+ log.info("productVersion = " + productVersion);
+ }
+
+ public void consume(Connection c, Queue queue, QueueService.TaskEffect effect) throws SQLException {
+ TaskDao taskDao = createTaskDao(c);
+
+ List<Task> tasks = taskDao.findByNameAndCompletedIsNull(queue.name);
+ log.trace("Got {} tasks.", tasks.size());
+
+ for (Task task : tasks) {
+ log.trace("Executing task {}", task.id());
+ try {
+ List<Task> newTasks = effect.consume(task);
+ log.trace("Executed task {}, newTasks: ", task.id(), newTasks.size());
+
+ Date now = new Date();
+
+ task = task.registerComplete(now);
+
+ for (Task newTask : newTasks) {
+ taskDao.insert(task.id(), newTask.queue, now, newTask.arguments);
+ }
+
+ taskDao.update(task);
+ } catch (Throwable e) {
+ log.error("Unable to execute task, id=" + task.id(), e);
+ }
+ c.commit();
+ }
+ }
+
+ public Queue getQueue(Connection c, String name, int interval, boolean autoCreate) throws SQLException {
+ QueueDao queueDao = createQueueDao(c);
+
+ Queue q = queueDao.findByName(name);
+
+ if (q == null) {
+ if (!autoCreate) {
+ throw new SQLException("No such queue: '" + name + "'.");
+ }
+
+ q = new Queue(name, interval);
+ queueDao.insert(q);
+ }
+
+ return q;
+ }
+
+ public void schedule(Connection c, Queue queue, Date scheduled, List<String> arguments) throws SQLException {
+ TaskDao taskDao = createTaskDao(c);
+
+ taskDao.insert(queue.name, scheduled, arguments);
+ }
+
+ public static JdbcQueueService createQueueService(Connection c) throws SQLException {
+ return new JdbcQueueService(c);
+ }
+
+ public QueueDao createQueueDao(Connection c) {
+ return new QueueDao(c);
+ }
+
+ public TaskDao createTaskDao(Connection c) {
+ return new TaskDao(c);
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/QueueService.java b/src/main/java/io/trygvis/queue/QueueService.java
new file mode 100644
index 0000000..2111013
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/QueueService.java
@@ -0,0 +1,17 @@
+package io.trygvis.queue;
+
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.List;
+
+public interface QueueService {
+ void consume(Queue queue, TaskEffect effect) throws SQLException;
+
+ Queue getQueue(String name, int interval, boolean autoCreate) throws SQLException;
+
+ void schedule(Queue queue, Date scheduled, List<String> arguments) throws SQLException;
+
+ public static interface TaskEffect {
+ List<Task> consume(Task task) throws Exception;
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/Task.java b/src/main/java/io/trygvis/queue/Task.java
index 2b1103b..1af40d7 100755
--- a/src/main/java/io/trygvis/queue/Task.java
+++ b/src/main/java/io/trygvis/queue/Task.java
@@ -3,9 +3,11 @@ package io.trygvis.queue;
import java.util.Date;
import java.util.List;
+import static java.util.Arrays.asList;
+
public class Task {
- public final long id;
+ private final long id;
public final Long parent;
@@ -54,7 +56,42 @@ public class Task {
'}';
}
+ public long id() {
+ if (id == 0) {
+ throw new RuntimeException("This task has not been persisted yet.");
+ }
+
+ return id;
+ }
+
public boolean isDone() {
return completed != null;
}
+
+ public static Task newTask(String name, Date scheduled, String... arguments) {
+ return new Task(0, 0l, name, scheduled, null, 0, null, asList(arguments));
+ }
+
+ public static Task newTask(String name, Date scheduled, List<String> arguments) {
+ return new Task(0, 0l, name, scheduled, null, 0, null, arguments);
+ }
+
+ public static List<String> stringToArguments(String arguments) {
+ return asList(arguments.split(","));
+ }
+
+ public static String argumentsToString(List<String> arguments) {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0, argumentsLength = arguments.size(); i < argumentsLength; i++) {
+ String argument = arguments.get(i);
+ if (argument.contains(",")) {
+ throw new RuntimeException("The argument string can't contain a comma.");
+ }
+ if (i > 0) {
+ builder.append(',');
+ }
+ builder.append(argument);
+ }
+ return builder.toString();
+ }
}
diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java
index 5459933..3aa2ac2 100644
--- a/src/main/java/io/trygvis/queue/TaskDao.java
+++ b/src/main/java/io/trygvis/queue/TaskDao.java
@@ -11,7 +11,8 @@ import java.util.Collections;
import java.util.Date;
import java.util.List;
-import static java.util.Arrays.asList;
+import static io.trygvis.queue.Task.argumentsToString;
+import static io.trygvis.queue.Task.stringToArguments;
public class TaskDao {
@@ -23,11 +24,11 @@ public class TaskDao {
this.connection = connection;
}
- public long insert(String queue, Date scheduled, String arguments) throws SQLException {
+ public long insert(String queue, Date scheduled, List<String> arguments) throws SQLException {
return insert(null, queue, scheduled, arguments);
}
- public long insert(Long parent, String queue, Date scheduled, String arguments) throws SQLException {
+ public long insert(Long parent, String queue, Date scheduled, List<String> arguments) throws SQLException {
String sql = "INSERT INTO task(id, parent, run_count, queue, scheduled, arguments) " +
"VALUES(nextval('task_seq'), ?, 0, ?, ?, ?)";
try (PreparedStatement stmt = connection.prepareStatement(sql)) {
@@ -39,7 +40,7 @@ public class TaskDao {
}
stmt.setString(i++, queue);
stmt.setTimestamp(i++, new Timestamp(scheduled.getTime()));
- stmt.setString(i, arguments);
+ stmt.setString(i, argumentsToString(arguments));
stmt.executeUpdate();
}
try (PreparedStatement stmt = connection.prepareStatement("SELECT currval('task_seq')")) {
@@ -51,6 +52,7 @@ public class TaskDao {
public Task findById(long id) throws SQLException {
try (PreparedStatement stmt = connection.prepareStatement("SELECT " + fields + " FROM task WHERE id=?")) {
+ stmt.setLong(1, id);
ResultSet rs = stmt.executeQuery();
return rs.next() ? mapRow(rs) : null;
}
@@ -76,7 +78,7 @@ public class TaskDao {
setTimestamp(stmt, i++, task.lastRun);
stmt.setInt(i++, task.runCount);
setTimestamp(stmt, i++, task.completed);
- stmt.setLong(i, task.id);
+ stmt.setLong(i, task.id());
stmt.executeUpdate();
}
}
@@ -99,6 +101,6 @@ public class TaskDao {
rs.getTimestamp(5),
rs.getInt(6),
rs.getTimestamp(7),
- arguments != null ? asList(arguments.split(" ")) : Collections.<String>emptyList());
+ arguments != null ? stringToArguments(arguments) : Collections.<String>emptyList());
}
}
diff --git a/src/main/java/io/trygvis/spring/DefaultConfig.java b/src/main/java/io/trygvis/spring/DefaultConfig.java
index af8f644..68761f2 100644
--- a/src/main/java/io/trygvis/spring/DefaultConfig.java
+++ b/src/main/java/io/trygvis/spring/DefaultConfig.java
@@ -1,17 +1,21 @@
package io.trygvis.spring;
import io.trygvis.async.AsyncService;
-import io.trygvis.async.spring.SpringJdbcAsyncService;
+import io.trygvis.queue.QueueService;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
-import org.springframework.transaction.support.TransactionTemplate;
@Configuration
public class DefaultConfig {
@Bean
- public AsyncService asyncService(TransactionTemplate transactionTemplate, JdbcTemplate jdbcTemplate) {
- return new SpringJdbcAsyncService(transactionTemplate, jdbcTemplate);
+ public AsyncService asyncService(JdbcTemplate jdbcTemplate) {
+ return new SpringJdbcAsyncService(jdbcTemplate);
+ }
+
+ @Bean
+ public QueueService queueService(JdbcTemplate jdbcTemplate) {
+ return new SpringQueueService(jdbcTemplate);
}
}
diff --git a/src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java b/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java
index 327dffa..6702642 100644
--- a/src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java
+++ b/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java
@@ -1,4 +1,4 @@
-package io.trygvis.async.spring;
+package io.trygvis.spring;
import io.trygvis.async.AsyncService;
import io.trygvis.async.JdbcAsyncService;
@@ -12,10 +12,10 @@ import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
-import org.springframework.transaction.support.TransactionTemplate;
import java.sql.Connection;
import java.sql.SQLException;
+import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -27,42 +27,32 @@ public class SpringJdbcAsyncService implements AsyncService {
private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory());
- private final TransactionTemplate transactionTemplate;
-
private final JdbcTemplate jdbcTemplate;
- private SqlEffectExecutor sqlEffectExecutor;
+ private final SqlEffectExecutor sqlEffectExecutor;
- final JdbcAsyncService jdbcAsyncService;
+ private final JdbcAsyncService jdbcAsyncService;
- public SpringJdbcAsyncService(TransactionTemplate transactionTemplate, JdbcTemplate jdbcTemplate) {
- this.transactionTemplate = transactionTemplate;
+ public SpringJdbcAsyncService(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
jdbcAsyncService = new JdbcAsyncService();
sqlEffectExecutor = new SqlEffectExecutor(this.jdbcTemplate.getDataSource());
}
@Transactional(propagation = REQUIRED)
- public Queue registerQueue(final String name, final int interval, final AsyncService.AsyncCallable callable) {
- return jdbcTemplate.execute(new ConnectionCallback<Queue>() {
- @Override
- public Queue doInConnection(Connection c) throws SQLException {
-
- Queue q = jdbcAsyncService.registerQueue(c, sqlEffectExecutor, name, interval, callable);
-
- registerSynchronization(new TransactionSynchronizationAdapter() {
- public void afterCompletion(int status) {
- log.info("Transaction completed with status = {}", status);
- if (status == TransactionSynchronization.STATUS_COMMITTED) {
- jdbcAsyncService.startQueue(executor, name);
- }
- }
- });
-
- log.info("registerQueue: LEAVE");
- return q;
+ public void registerQueue(final Queue queue, final AsyncService.AsyncCallable callable) {
+ jdbcAsyncService.registerQueue(sqlEffectExecutor, queue, callable);
+
+ registerSynchronization(new TransactionSynchronizationAdapter() {
+ public void afterCompletion(int status) {
+ log.info("Transaction completed with status = {}", status);
+ if (status == TransactionSynchronization.STATUS_COMMITTED) {
+ jdbcAsyncService.startQueue(executor, queue.name);
+ }
}
});
+
+ log.info("registerQueue: LEAVE");
}
public Queue getQueue(String name) {
@@ -70,7 +60,7 @@ public class SpringJdbcAsyncService implements AsyncService {
}
@Transactional(propagation = REQUIRED)
- public Task schedule(final Queue queue, final String... args) {
+ public Task schedule(final Queue queue, final List<String> args) {
return jdbcTemplate.execute(new ConnectionCallback<Task>() {
@Override
public Task doInConnection(Connection c) throws SQLException {
@@ -79,7 +69,7 @@ public class SpringJdbcAsyncService implements AsyncService {
});
}
- public Task schedule(final long parent, final Queue queue, final String... args) {
+ public Task schedule(final long parent, final Queue queue, final List<String> args) {
return jdbcTemplate.execute(new ConnectionCallback<Task>() {
@Override
public Task doInConnection(Connection c) throws SQLException {
diff --git a/src/main/java/io/trygvis/spring/SpringQueueService.java b/src/main/java/io/trygvis/spring/SpringQueueService.java
new file mode 100644
index 0000000..3432e35
--- /dev/null
+++ b/src/main/java/io/trygvis/spring/SpringQueueService.java
@@ -0,0 +1,70 @@
+package io.trygvis.spring;
+
+import io.trygvis.queue.JdbcQueueService;
+import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueService;
+import org.springframework.dao.DataAccessException;
+import org.springframework.jdbc.core.ConnectionCallback;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.transaction.annotation.Transactional;
+
+import javax.annotation.PostConstruct;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.List;
+
+import static io.trygvis.queue.JdbcQueueService.createQueueService;
+
+public class SpringQueueService implements QueueService {
+
+ public final JdbcTemplate jdbcTemplate;
+
+ public JdbcQueueService queueService;
+
+ public SpringQueueService(JdbcTemplate jdbcTemplate) {
+ this.jdbcTemplate = jdbcTemplate;
+ }
+
+ @PostConstruct
+ public void postConstruct() {
+ queueService = jdbcTemplate.execute(new ConnectionCallback<JdbcQueueService>() {
+ @Override
+ public JdbcQueueService doInConnection(Connection c) throws SQLException, DataAccessException {
+ return createQueueService(c);
+ }
+ });
+ }
+
+ @Transactional
+ public void consume(final Queue queue, final TaskEffect effect) throws SQLException {
+ jdbcTemplate.execute(new ConnectionCallback<Object>() {
+ @Override
+ public Object doInConnection(Connection c) throws SQLException, DataAccessException {
+ queueService.consume(c, queue, effect);
+ return null;
+ }
+ });
+ }
+
+ @Transactional
+ public Queue getQueue(final String name, final int interval, final boolean autoCreate) throws SQLException {
+ return jdbcTemplate.execute(new ConnectionCallback<Queue>() {
+ @Override
+ public Queue doInConnection(Connection c) throws SQLException, DataAccessException {
+ return queueService.getQueue(c, name, interval, autoCreate);
+ }
+ });
+ }
+
+ @Transactional
+ public void schedule(final Queue queue, final Date scheduled, final List<String> arguments) throws SQLException {
+ jdbcTemplate.execute(new ConnectionCallback<Object>() {
+ @Override
+ public Object doInConnection(Connection c) throws SQLException, DataAccessException {
+ queueService.schedule(c, queue, scheduled, arguments);
+ return null;
+ }
+ });
+ }
+}
diff --git a/src/test/java/io/trygvis/test/DbUtil.java b/src/test/java/io/trygvis/test/DbUtil.java
new file mode 100644
index 0000000..d0e5b47
--- /dev/null
+++ b/src/test/java/io/trygvis/test/DbUtil.java
@@ -0,0 +1,47 @@
+package io.trygvis.test;
+
+import com.jolbox.bonecp.BoneCPDataSource;
+import org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy;
+import org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy;
+
+import javax.sql.DataSource;
+
+import java.io.PrintWriter;
+import java.sql.SQLException;
+
+import static java.lang.System.getProperty;
+
+public class DbUtil {
+ public static DataSource createDataSource() throws SQLException {
+ String username = getProperty("user.name");
+ String jdbcUrl = getProperty("database.url", "jdbc:postgresql://localhost/" + username);
+ String user = getProperty("database.username", username);
+ String pass = getProperty("database.password", username);
+
+ return createDataSource(jdbcUrl, user, pass);
+ }
+
+ public static DataSource createDataSource(String jdbcUrl, String username, String password) throws SQLException {
+ BoneCPDataSource ds = new BoneCPDataSource();
+
+ ds.setLogStatementsEnabled(true);
+
+ ds.setJdbcUrl(jdbcUrl);
+ ds.setUsername(username);
+ ds.setPassword(password);
+
+ ds.setDefaultAutoCommit(false);
+ ds.setIdleConnectionTestPeriodInSeconds(60);
+ ds.setIdleMaxAgeInSeconds(240);
+ ds.setMaxConnectionsPerPartition(40);
+ ds.setMinConnectionsPerPartition(0);
+ ds.setPartitionCount(1);
+ ds.setAcquireIncrement(1);
+ ds.setStatementsCacheSize(1000);
+ ds.setReleaseHelperThreads(3);
+ ds.setStatisticsEnabled(true);
+ ds.setLogStatementsEnabled(true);
+ ds.setLogWriter(new PrintWriter(System.err));
+ return new TransactionAwareDataSourceProxy(new LazyConnectionDataSourceProxy(ds));
+ }
+}
diff --git a/src/test/java/io/trygvis/test/Main.java b/src/test/java/io/trygvis/test/Main.java
index d274101..43ee971 100755
--- a/src/test/java/io/trygvis/test/Main.java
+++ b/src/test/java/io/trygvis/test/Main.java
@@ -2,6 +2,7 @@ package io.trygvis.test;
import io.trygvis.async.AsyncService;
import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueService;
import io.trygvis.queue.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,6 +58,9 @@ public class Main {
private AsyncService asyncService;
@Autowired
+ private QueueService queueService;
+
+ @Autowired
@Qualifier("createArticle")
private AsyncService.AsyncCallable createArticleCallable;
@@ -67,7 +71,9 @@ public class Main {
public void run() throws Exception {
log.info("Main.run");
- final Queue q = asyncService.registerQueue("create-article", 1, createArticleCallable);
+ final Queue q = null; // queueService.getQueue(c, "create-article", 1);
+
+ asyncService.registerQueue(q, createArticleCallable);
// log.info("queue registered: ref = {}", q);
// asyncService.registerQueue("update-queue", 1, updateArticleCallable);
diff --git a/src/test/java/io/trygvis/test/PlainJavaExample.java b/src/test/java/io/trygvis/test/PlainJavaExample.java
new file mode 100644
index 0000000..338abad
--- /dev/null
+++ b/src/test/java/io/trygvis/test/PlainJavaExample.java
@@ -0,0 +1,70 @@
+package io.trygvis.test;
+
+import io.trygvis.queue.JdbcQueueService;
+import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueService;
+import io.trygvis.queue.Task;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.util.Date;
+import java.util.List;
+
+import static io.trygvis.queue.JdbcQueueService.createQueueService;
+import static io.trygvis.queue.Task.newTask;
+import static io.trygvis.test.DbUtil.createDataSource;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+
+public class PlainJavaExample {
+
+ private static String inputName = "my-input";
+ private static String outputName = "my-output";
+
+ private static int interval = 10;
+
+ public static class Consumer {
+ public static void main(String[] args) throws Exception {
+ System.out.println("Starting consumer");
+
+ DataSource ds = createDataSource();
+ Connection c = ds.getConnection();
+
+ JdbcQueueService queueService = createQueueService(c);
+
+ final Queue input = queueService.getQueue(c, inputName, interval, true);
+ final Queue output = queueService.getQueue(c, outputName, interval, true);
+
+ queueService.consume(c, input, new QueueService.TaskEffect() {
+ public List<Task> consume(Task task) throws Exception {
+ System.out.println("PlainJavaExample$Consumer.consume");
+ Long a = Long.valueOf(task.arguments.get(0));
+ Long b = Long.valueOf(task.arguments.get(1));
+
+ System.out.println("a + b = " + a + " + " + b + " = " + (a + b));
+
+ return singletonList(newTask(output.name, new Date(), Long.toString(a + b)));
+ }
+ });
+
+ c.commit();
+ }
+ }
+
+ public static class Producer {
+ public static void main(String[] args) throws Exception {
+ System.out.println("Starting producer");
+
+ DataSource ds = createDataSource();
+ Connection c = ds.getConnection();
+
+ JdbcQueueService queueService = createQueueService(c);
+
+ Queue queue = queueService.getQueue(c, inputName, interval, true);
+
+ queueService.schedule(c, queue, new Date(), asList("10", "20"));
+
+ c.commit();
+ }
+ }
+}
diff --git a/src/test/java/io/trygvis/test/spring/PlainSpringTest.java b/src/test/java/io/trygvis/test/spring/PlainSpringTest.java
index 9a7a436..07e67fb 100644
--- a/src/test/java/io/trygvis/test/spring/PlainSpringTest.java
+++ b/src/test/java/io/trygvis/test/spring/PlainSpringTest.java
@@ -2,6 +2,7 @@ package io.trygvis.test.spring;
import io.trygvis.async.AsyncService;
import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueService;
import io.trygvis.spring.DefaultConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -15,6 +16,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static java.lang.System.getProperty;
import static java.lang.System.setProperty;
+import static java.util.Arrays.asList;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.assertNotNull;
@@ -25,6 +27,9 @@ public class PlainSpringTest {
@Autowired
private AsyncService asyncService;
+ @Autowired
+ private QueueService queueService;
+
static {
String username = getProperty("user.name");
setProperty("database.url", getProperty("jdbc.url", "jdbc:postgresql://localhost/" + username));
@@ -34,8 +39,9 @@ public class PlainSpringTest {
@Test
public void testBasic() throws SQLException, InterruptedException {
+ Queue test = queueService.getQueue("test", 10, true);
final AtomicReference<List<String>> ref = new AtomicReference<>();
- Queue test = asyncService.registerQueue("test", 10, new AsyncService.AsyncCallable() {
+ asyncService.registerQueue(test, new AsyncService.AsyncCallable() {
public void run(List<String> arguments) throws Exception {
System.out.println("PlainSpringTest.run");
ref.set(arguments);
@@ -47,7 +53,7 @@ public class PlainSpringTest {
synchronized (ref) {
System.out.println("Scheduling task");
- asyncService.schedule(test, "hello", "world");
+ asyncService.schedule(test, asList("hello", "world"));
System.out.println("Waiting");
ref.wait(1000);
}
diff --git a/src/test/java/io/trygvis/test/spring/TestConfig.java b/src/test/java/io/trygvis/test/spring/TestConfig.java
index 7853cb5..a1d95c0 100755
--- a/src/test/java/io/trygvis/test/spring/TestConfig.java
+++ b/src/test/java/io/trygvis/test/spring/TestConfig.java
@@ -100,6 +100,7 @@ public class TestConfig {
ds.setUsername(username);
ds.setPassword(password);
+ ds.setDefaultAutoCommit(false);
ds.setIdleConnectionTestPeriodInSeconds(60);
ds.setIdleMaxAgeInSeconds(240);
ds.setMaxConnectionsPerPartition(40);