aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/spring
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/spring')
-rw-r--r--src/main/java/io/trygvis/spring/DefaultConfig.java18
-rw-r--r--src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java13
-rw-r--r--src/main/java/io/trygvis/spring/SpringQueueService.java32
3 files changed, 29 insertions, 34 deletions
diff --git a/src/main/java/io/trygvis/spring/DefaultConfig.java b/src/main/java/io/trygvis/spring/DefaultConfig.java
index 68761f2..3ba28de 100644
--- a/src/main/java/io/trygvis/spring/DefaultConfig.java
+++ b/src/main/java/io/trygvis/spring/DefaultConfig.java
@@ -1,21 +1,31 @@
package io.trygvis.spring;
import io.trygvis.async.AsyncService;
+import io.trygvis.async.SqlEffectExecutor;
import io.trygvis.queue.QueueService;
+import io.trygvis.queue.QueueSystem;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
+import javax.sql.DataSource;
+import java.sql.SQLException;
+
@Configuration
public class DefaultConfig {
@Bean
- public AsyncService asyncService(JdbcTemplate jdbcTemplate) {
- return new SpringJdbcAsyncService(jdbcTemplate);
+ public QueueSystem queueSystem(DataSource ds) throws SQLException {
+ return QueueSystem.initialize(new SqlEffectExecutor(ds));
+ }
+
+ @Bean
+ public AsyncService asyncService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) {
+ return new SpringJdbcAsyncService(queueSystem, jdbcTemplate);
}
@Bean
- public QueueService queueService(JdbcTemplate jdbcTemplate) {
- return new SpringQueueService(jdbcTemplate);
+ public QueueService queueService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) {
+ return new SpringQueueService(queueSystem, jdbcTemplate);
}
}
diff --git a/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java b/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java
index 6702642..96442e6 100644
--- a/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java
+++ b/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java
@@ -4,7 +4,9 @@ import io.trygvis.async.AsyncService;
import io.trygvis.async.JdbcAsyncService;
import io.trygvis.async.SqlEffectExecutor;
import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueSystem;
import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskEffect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.ConnectionCallback;
@@ -29,19 +31,16 @@ public class SpringJdbcAsyncService implements AsyncService {
private final JdbcTemplate jdbcTemplate;
- private final SqlEffectExecutor sqlEffectExecutor;
-
private final JdbcAsyncService jdbcAsyncService;
- public SpringJdbcAsyncService(JdbcTemplate jdbcTemplate) {
+ public SpringJdbcAsyncService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
- jdbcAsyncService = new JdbcAsyncService();
- sqlEffectExecutor = new SqlEffectExecutor(this.jdbcTemplate.getDataSource());
+ jdbcAsyncService = new JdbcAsyncService(queueSystem);
}
@Transactional(propagation = REQUIRED)
- public void registerQueue(final Queue queue, final AsyncService.AsyncCallable callable) {
- jdbcAsyncService.registerQueue(sqlEffectExecutor, queue, callable);
+ public void registerQueue(final Queue queue, final TaskEffect processor) {
+ jdbcAsyncService.registerQueue(queue, processor);
registerSynchronization(new TransactionSynchronizationAdapter() {
public void afterCompletion(int status) {
diff --git a/src/main/java/io/trygvis/spring/SpringQueueService.java b/src/main/java/io/trygvis/spring/SpringQueueService.java
index 3432e35..21746e5 100644
--- a/src/main/java/io/trygvis/spring/SpringQueueService.java
+++ b/src/main/java/io/trygvis/spring/SpringQueueService.java
@@ -3,48 +3,34 @@ package io.trygvis.spring;
import io.trygvis.queue.JdbcQueueService;
import io.trygvis.queue.Queue;
import io.trygvis.queue.QueueService;
+import io.trygvis.queue.QueueSystem;
+import io.trygvis.queue.TaskEffect;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.ConnectionCallback;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.annotation.Transactional;
-import javax.annotation.PostConstruct;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Date;
import java.util.List;
-import static io.trygvis.queue.JdbcQueueService.createQueueService;
-
public class SpringQueueService implements QueueService {
public final JdbcTemplate jdbcTemplate;
public JdbcQueueService queueService;
- public SpringQueueService(JdbcTemplate jdbcTemplate) {
+ public SpringQueueService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
+ this.queueService = queueSystem.queueService;
}
- @PostConstruct
- public void postConstruct() {
- queueService = jdbcTemplate.execute(new ConnectionCallback<JdbcQueueService>() {
- @Override
- public JdbcQueueService doInConnection(Connection c) throws SQLException, DataAccessException {
- return createQueueService(c);
- }
- });
- }
-
- @Transactional
+ /**
+ * @see JdbcQueueService#consumeAll(io.trygvis.queue.Queue, io.trygvis.queue.TaskEffect)
+ */
public void consume(final Queue queue, final TaskEffect effect) throws SQLException {
- jdbcTemplate.execute(new ConnectionCallback<Object>() {
- @Override
- public Object doInConnection(Connection c) throws SQLException, DataAccessException {
- queueService.consume(c, queue, effect);
- return null;
- }
- });
+ queueService.consumeAll(queue, effect);
}
@Transactional
@@ -52,7 +38,7 @@ public class SpringQueueService implements QueueService {
return jdbcTemplate.execute(new ConnectionCallback<Queue>() {
@Override
public Queue doInConnection(Connection c) throws SQLException, DataAccessException {
- return queueService.getQueue(c, name, interval, autoCreate);
+ return queueService.lookupQueue(c, name, interval, autoCreate);
}
});
}