package io.trygvis.spring; import io.trygvis.async.AsyncService; import io.trygvis.async.JdbcAsyncService; import io.trygvis.async.SqlEffectExecutor; import io.trygvis.queue.Queue; import io.trygvis.queue.QueueSystem; import io.trygvis.queue.Task; import io.trygvis.queue.TaskEffect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jdbc.core.ConnectionCallback; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.support.TransactionSynchronization; import org.springframework.transaction.support.TransactionSynchronizationAdapter; import java.sql.Connection; import java.sql.SQLException; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledThreadPoolExecutor; import static org.springframework.transaction.annotation.Propagation.REQUIRED; import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization; public class SpringJdbcAsyncService implements AsyncService { private final Logger log = LoggerFactory.getLogger(getClass()); private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); private final JdbcTemplate jdbcTemplate; private final JdbcAsyncService jdbcAsyncService; public SpringJdbcAsyncService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) { this.jdbcTemplate = jdbcTemplate; jdbcAsyncService = new JdbcAsyncService(queueSystem); } @Transactional(propagation = REQUIRED) public void registerQueue(final Queue queue, final TaskEffect processor) { jdbcAsyncService.registerQueue(queue, processor); registerSynchronization(new TransactionSynchronizationAdapter() { public void afterCompletion(int status) { log.info("Transaction completed with status = {}", status); if (status == TransactionSynchronization.STATUS_COMMITTED) { jdbcAsyncService.startQueue(executor, queue.name); } } }); log.info("registerQueue: LEAVE"); } public Queue getQueue(String name) { return jdbcAsyncService.getQueue(name); } @Transactional(propagation = REQUIRED) public Task schedule(final Queue queue, final List args) { return jdbcTemplate.execute(new ConnectionCallback() { @Override public Task doInConnection(Connection c) throws SQLException { return jdbcAsyncService.schedule(c, queue, args); } }); } public Task schedule(final long parent, final Queue queue, final List args) { return jdbcTemplate.execute(new ConnectionCallback() { @Override public Task doInConnection(Connection c) throws SQLException { return jdbcAsyncService.schedule(c, parent, queue, args); } }); } @Transactional(readOnly = true) public Task update(final Task ref) { return jdbcTemplate.execute(new ConnectionCallback() { @Override public Task doInConnection(Connection c) throws SQLException { return jdbcAsyncService.update(c, ref); } }); } }