package io.trygvis.spring; import io.trygvis.async.AsyncService; import io.trygvis.async.JdbcAsyncService; import io.trygvis.async.QueueController; import io.trygvis.queue.SqlEffect; import io.trygvis.queue.JdbcQueueService; import io.trygvis.queue.Queue; import io.trygvis.queue.QueueExecutor; import io.trygvis.queue.QueueService; import io.trygvis.queue.QueueSystem; import io.trygvis.queue.Task; import io.trygvis.queue.TaskEffect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jdbc.core.ConnectionCallback; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.support.TransactionSynchronization; import org.springframework.transaction.support.TransactionSynchronizationAdapter; import java.sql.Connection; import java.sql.SQLException; import java.util.Date; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledThreadPoolExecutor; import static org.springframework.transaction.annotation.Propagation.REQUIRED; import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization; public class SpringJdbcAsyncService implements AsyncService { private final Logger log = LoggerFactory.getLogger(getClass()); private final ScheduledThreadPoolExecutor executor; private final JdbcTemplate jdbcTemplate; private final JdbcAsyncService jdbcAsyncService; private final JdbcQueueService queueService; private final QueueSystem queueSystem; public SpringJdbcAsyncService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) { this.queueSystem = queueSystem; this.jdbcTemplate = jdbcTemplate; jdbcAsyncService = new JdbcAsyncService(queueSystem); queueService = queueSystem.createQueueService(); executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); } @Transactional(propagation = REQUIRED) public QueueController registerQueue(final Queue queue, final QueueService.TaskExecutionRequest req, final TaskEffect processor) throws SQLException { QueueExecutor queueExecutor = queueSystem.sqlEffectExecutor.transaction(new SqlEffect() { @Override public QueueExecutor doInConnection(Connection c) throws SQLException { return queueService.lookupQueue(c, queue.name, queue.interval, true); } }); final QueueController queueController = jdbcAsyncService.registerQueue(queueExecutor, req, processor); registerSynchronization(new TransactionSynchronizationAdapter() { public void afterCompletion(int status) { log.info("Transaction completed with status = {}", status); if (status == TransactionSynchronization.STATUS_COMMITTED) { queueController.start(executor); } } }); return queueController; } public QueueExecutor getQueue(String name) { return jdbcAsyncService.getQueue(name); } @Transactional(propagation = REQUIRED) public Task schedule(final Queue queue, final Date scheduled, final List args) { return jdbcTemplate.execute(new ConnectionCallback() { @Override public Task doInConnection(Connection c) throws SQLException { QueueExecutor queueExecutor = queueService.getQueue(queue.name); return queueExecutor.schedule(c, scheduled, args); } }); } public Task schedule(final Queue queue, final long parent, final Date scheduled, final List args) { return jdbcTemplate.execute(new ConnectionCallback() { @Override public Task doInConnection(Connection c) throws SQLException { QueueExecutor queueExecutor = queueService.getQueue(queue.name); return queueExecutor.schedule(c, parent, scheduled, args); } }); } @Transactional(readOnly = true) public Task update(final Task ref) { return jdbcTemplate.execute(new ConnectionCallback() { @Override public Task doInConnection(Connection c) throws SQLException { return jdbcAsyncService.update(c, ref); } }); } }