diff options
Diffstat (limited to 'src/main/java/io/trygvis/async/spring')
-rw-r--r-- | src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java | 102 |
1 files changed, 102 insertions, 0 deletions
diff --git a/src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java b/src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java new file mode 100644 index 0000000..8517c68 --- /dev/null +++ b/src/main/java/io/trygvis/async/spring/SpringJdbcAsyncService.java @@ -0,0 +1,102 @@ +package io.trygvis.async.spring; + +import io.trygvis.async.AsyncService; +import io.trygvis.async.JdbcAsyncService; +import io.trygvis.async.SqlEffectExecutor; +import io.trygvis.queue.Queue; +import io.trygvis.queue.Task; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.core.ConnectionCallback; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionSynchronization; +import org.springframework.transaction.support.TransactionSynchronizationAdapter; +import org.springframework.transaction.support.TransactionTemplate; + +import javax.annotation.PostConstruct; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static org.springframework.transaction.annotation.Propagation.REQUIRED; +import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization; + +public class SpringJdbcAsyncService implements AsyncService { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); + + private final TransactionTemplate transactionTemplate; + + private final JdbcTemplate jdbcTemplate; + + private SqlEffectExecutor sqlEffectExecutor; + + final JdbcAsyncService jdbcAsyncService; + + public SpringJdbcAsyncService(TransactionTemplate transactionTemplate, JdbcTemplate jdbcTemplate) { + this.transactionTemplate = transactionTemplate; + this.jdbcTemplate = jdbcTemplate; + jdbcAsyncService = new JdbcAsyncService(); + sqlEffectExecutor = new SqlEffectExecutor(this.jdbcTemplate.getDataSource()); + } + + @Transactional(propagation = REQUIRED) + public Queue registerQueue(final String name, final int interval, final AsyncService.AsyncCallable callable) { + return jdbcTemplate.execute(new ConnectionCallback<Queue>() { + @Override + public Queue doInConnection(Connection c) throws SQLException { + + Queue q = jdbcAsyncService.registerQueue(c, sqlEffectExecutor, name, interval, callable); + + registerSynchronization(new TransactionSynchronizationAdapter() { + public void afterCompletion(int status) { + log.info("Transaction completed with status = {}", status); + if (status == TransactionSynchronization.STATUS_COMMITTED) { + jdbcAsyncService.startQueue(executor, name); + } + } + }); + + log.info("registerQueue: LEAVE"); + return q; + } + }); + } + + public Queue getQueue(String name) { + return jdbcAsyncService.getQueue(name); + } + + @Transactional(propagation = REQUIRED) + public Task schedule(final Queue queue, final String... args) { + return jdbcTemplate.execute(new ConnectionCallback<Task>() { + @Override + public Task doInConnection(Connection c) throws SQLException { + return jdbcAsyncService.schedule(c, queue, args); + } + }); + } + + public Task schedule(final long parent, final Queue queue, final String... args) { + return jdbcTemplate.execute(new ConnectionCallback<Task>() { + @Override + public Task doInConnection(Connection c) throws SQLException { + return jdbcAsyncService.schedule(c, parent, queue, args); + } + }); + } + + @Transactional(readOnly = true) + public Task update(final Task ref) { + return jdbcTemplate.execute(new ConnectionCallback<Task>() { + @Override + public Task doInConnection(Connection c) throws SQLException { + return jdbcAsyncService.update(c, ref); + } + }); + } +} |