package io.trygvis.queue; import io.trygvis.data.QueueRepository; import io.trygvis.data.TaskRepository; import io.trygvis.model.Queue; import org.quartz.Job; import org.quartz.JobDataMap; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.quartz.JobKey; import org.quartz.JobPersistenceException; import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.quartz.SchedulerFactory; import org.quartz.SimpleScheduleBuilder; import org.quartz.SimpleTrigger; import org.quartz.TriggerBuilder; import org.quartz.impl.JobDetailImpl; import org.quartz.impl.StdSchedulerFactory; import org.quartz.impl.jdbcjobstore.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.jdbc.datasource.DataSourceUtils; import org.springframework.stereotype.Component; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.annotation.Transactional; import javax.annotation.*; import javax.persistence.EntityManager; import javax.persistence.PersistenceContext; import javax.persistence.RollbackException; import javax.persistence.TypedQuery; import javax.sql.DataSource; import java.sql.Connection; import java.sql.SQLException; import java.util.*; import static org.quartz.SimpleScheduleBuilder.simpleSchedule; @SuppressWarnings("SpringJavaAutowiringInspection") @Component public class JpaAsyncService implements AsyncService, ApplicationContextAware { private final Logger log = LoggerFactory.getLogger(getClass()); @PersistenceContext private EntityManager entityManager; @Autowired private DataSource dataSource; @Autowired private PlatformTransactionManager transactionManager; @Autowired private QueueRepository queueRepository; @Autowired private TaskRepository taskRepository; private Scheduler scheduler; private static DataSource dataSourceStatic; private static PlatformTransactionManager transactionManagerStatic; private static class Context { ApplicationContext applicationContext; List callables = new ArrayList<>(); } private final Context context = new Context(); public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.context.applicationContext = applicationContext; } @PostConstruct public void postConstruct() throws Exception { transactionManagerStatic = transactionManager; dataSourceStatic = dataSource; log.info("afterPropertiesSet!!"); Properties quartzProperties = new Properties(); quartzProperties.setProperty(StdSchedulerFactory.PROP_JOB_STORE_CLASS, JobStoreTX.class.getName()); quartzProperties.setProperty(StdSchedulerFactory.PROP_SCHED_INSTANCE_NAME, "queue"); quartzProperties.setProperty(StdSchedulerFactory.PROP_SCHED_SKIP_UPDATE_CHECK, "true"); quartzProperties.setProperty(StdSchedulerFactory.PROP_SCHED_SKIP_UPDATE_CHECK, "true"); // quartzProperties.setProperty(StdSchedulerFactory.PROP_DATASOURCE_PREFIX, "wat"); quartzProperties.setProperty(StdSchedulerFactory.PROP_JOB_STORE_CLASS, JpaDataSourceJobStore.class.getName()); quartzProperties.setProperty(StdSchedulerFactory.PROP_JOB_STORE_LOCK_HANDLER_CLASS, SimpleSemaphore.class.getName()); quartzProperties.setProperty("org.quartz.threadPool.threadCount", "10"); quartzProperties.setProperty("org.quartz.jobStore.driverDelegateClass", PostgreSQLDelegate.class.getName()); quartzProperties.setProperty("org.quartz.scheduler.jmx.export", "true"); SchedulerFactory schedulerFactory = new StdSchedulerFactory(quartzProperties); scheduler = schedulerFactory.getScheduler(); scheduler.getContext().put("context", context); scheduler.start(); } @PreDestroy public void preDestroy() throws Exception { scheduler.shutdown(); } @Transactional public synchronized void registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException { Queue q = queueRepository.findByName(name); if (q == null) { Queue queue = new Queue(name, interval); queueRepository.save(queue); } else { boolean dirty = false; if (interval != q.getInterval()) { q.setInterval(interval); dirty = true; } if (dirty) { queueRepository.save(q); } } context.callables.add(callable); int index = context.callables.size() - 1; JobDetailImpl jobDetail = new JobDetailImpl(); JobKey jobKey = JobKey.jobKey("queue-" + name); jobDetail.setKey(jobKey); jobDetail.setJobClass(AsyncServiceJob.class); jobDetail.setDurability(true); jobDetail.getJobDataMap().put("index", Integer.toString(index)); scheduler.addJob(jobDetail, true); SimpleScheduleBuilder schedule = simpleSchedule(). repeatForever(). withIntervalInSeconds(interval); SimpleTrigger trigger = TriggerBuilder.newTrigger(). withSchedule(schedule). withIdentity(jobKey.getName()). forJob(jobKey). build(); if(scheduler.checkExists(trigger.getKey())) { scheduler.unscheduleJob(trigger.getKey()); } scheduler.scheduleJob(trigger); } @Transactional(readOnly = true) public JpaQueueRef getQueue(String name) { TypedQuery query = entityManager.createQuery("select q from io.trygvis.model.Queue q where name = ?1", Queue.class); query.setParameter(1, name); List list = query.getResultList(); System.out.println("list.size() = " + list.size()); if (list.size() == 0) { throw new RollbackException("No such queue: '" + name + "'."); } Queue queue = list.get(0); entityManager.detach(query); return new JpaQueueRef(queue); } @Transactional public JpaExecutionRef schedule(JpaQueueRef queue) { return null; } static class JpaQueueRef implements AsyncService.QueueRef { public final Queue queue; JpaQueueRef(Queue queue) { this.queue = queue; } } static class JpaExecutionRef implements AsyncService.ExecutionRef { } public static class AsyncServiceJob implements Job { private final Logger log = LoggerFactory.getLogger(getClass()); public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { try { log.info("Running"); Context context = (Context) jobExecutionContext.getScheduler().getContext().get("context"); for (Map.Entry entry : jobExecutionContext.getMergedJobDataMap().entrySet()) { log.info("{} = {}, class= {}", entry.getKey(), entry.getValue(), entry.getValue() != null ? entry.getValue().getClass() : ""); } int index = jobExecutionContext.getMergedJobDataMap().getIntFromString("index"); AsyncCallable callable = context.callables.get(index); log.info("Calling {}", callable); callable.run(); } catch (Throwable e) { log.warn("fail", e); // throw new JobExecutionException(e, false); } } } public static class JpaDataSourceJobStore extends JobStoreSupport { public JpaDataSourceJobStore() { setDataSource("wat"); } protected Connection getNonManagedTXConnection() throws JobPersistenceException { // DefaultTransactionDefinition definition = new DefaultTransactionDefinition(); // definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); // TransactionStatus transaction = transactionManagerStatic.getTransaction(definition); // System.out.println("dataSourceStatic = " + dataSourceStatic); Connection c = DataSourceUtils.getConnection(dataSourceStatic); try { c.setAutoCommit(false); } catch (SQLException e) { throw new RuntimeException(e); } // System.out.println("c = " + c); return c; } protected void closeConnection(Connection c) { try { DataSourceUtils.doCloseConnection(c, dataSourceStatic); } catch (SQLException e) { e.printStackTrace(); } } protected Object executeInLock(String lockName, TransactionCallback txCallback) throws JobPersistenceException { return executeInNonManagedTXLock(lockName, txCallback); } } }