From cd99d57cccb88ea8a058eca530d62a81a665983c Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Thu, 11 Apr 2013 10:13:44 +0200 Subject: o Initial import. --- src/main/java/io/trygvis/queue/AsyncService.java | 18 ++ .../java/io/trygvis/queue/JpaAsyncService.java | 246 +++++++++++++++++++++ 2 files changed, 264 insertions(+) create mode 100755 src/main/java/io/trygvis/queue/AsyncService.java create mode 100755 src/main/java/io/trygvis/queue/JpaAsyncService.java (limited to 'src/main/java/io/trygvis/queue') diff --git a/src/main/java/io/trygvis/queue/AsyncService.java b/src/main/java/io/trygvis/queue/AsyncService.java new file mode 100755 index 0000000..dcbe991 --- /dev/null +++ b/src/main/java/io/trygvis/queue/AsyncService.java @@ -0,0 +1,18 @@ +package io.trygvis.queue; + +import org.quartz.*; + +public interface AsyncService { + + void registerQueue(String name, int interval, Class klass) throws SchedulerException; + + QueueRef getQueue(String name); + + ExecutionRef schedule(QueueRef queue); + + interface QueueRef { + } + + interface ExecutionRef { + } +} diff --git a/src/main/java/io/trygvis/queue/JpaAsyncService.java b/src/main/java/io/trygvis/queue/JpaAsyncService.java new file mode 100755 index 0000000..2d6c2df --- /dev/null +++ b/src/main/java/io/trygvis/queue/JpaAsyncService.java @@ -0,0 +1,246 @@ +package io.trygvis.queue; + +import io.trygvis.data.QueueRepository; +import io.trygvis.data.TaskRepository; +import io.trygvis.model.Queue; +import org.quartz.Job; +import org.quartz.JobDataMap; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.quartz.JobKey; +import org.quartz.JobPersistenceException; +import org.quartz.Scheduler; +import org.quartz.SchedulerContext; +import org.quartz.SchedulerException; +import org.quartz.SchedulerFactory; +import org.quartz.SimpleScheduleBuilder; +import org.quartz.SimpleTrigger; +import org.quartz.TriggerBuilder; +import org.quartz.impl.DirectSchedulerFactory; +import org.quartz.impl.JobDetailImpl; +import org.quartz.impl.StdSchedulerFactory; +import org.quartz.impl.jdbcjobstore.JobStoreSupport; +import org.quartz.impl.jdbcjobstore.JobStoreTX; +import org.quartz.impl.jdbcjobstore.PostgreSQLDelegate; +import org.quartz.spi.JobStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.jdbc.datasource.DataSourceUtils; +import org.springframework.stereotype.Component; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.interceptor.RuleBasedTransactionAttribute; +import org.springframework.transaction.support.DefaultTransactionDefinition; + +import javax.annotation.PostConstruct; +import javax.persistence.EntityManager; +import javax.persistence.PersistenceContext; +import javax.persistence.RollbackException; +import javax.persistence.TypedQuery; +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.List; +import java.util.Properties; + +import static org.quartz.SimpleScheduleBuilder.simpleSchedule; + +@SuppressWarnings("SpringJavaAutowiringInspection") +@Component +public class JpaAsyncService implements AsyncService { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + @PersistenceContext + private EntityManager entityManager; + + @Autowired + private DataSource dataSource; + + private static DataSource dataSourceStatic; + + @Autowired + private PlatformTransactionManager transactionManager; + + private static PlatformTransactionManager transactionManagerStatic; + + @Autowired + private QueueRepository queueRepository; + + @Autowired + private TaskRepository taskRepository; + + private Scheduler scheduler; + + @PostConstruct + public void afterPropertiesSet() throws Exception { + transactionManagerStatic = transactionManager; + dataSourceStatic = dataSource; + log.info("afterPropertiesSet!!"); + Properties quartzProperties = new Properties(); + quartzProperties.setProperty(StdSchedulerFactory.PROP_JOB_STORE_CLASS, JobStoreTX.class.getName()); + quartzProperties.setProperty(StdSchedulerFactory.PROP_SCHED_INSTANCE_NAME, "queue"); + quartzProperties.setProperty(StdSchedulerFactory.PROP_SCHED_SKIP_UPDATE_CHECK, "true"); + quartzProperties.setProperty(StdSchedulerFactory.PROP_SCHED_SKIP_UPDATE_CHECK, "true"); +// quartzProperties.setProperty(StdSchedulerFactory.PROP_DATASOURCE_PREFIX, "wat"); + quartzProperties.setProperty(StdSchedulerFactory.PROP_JOB_STORE_CLASS, JpaDataSourceJobStore.class.getName()); + quartzProperties.setProperty("org.quartz.threadPool.threadCount", "10"); + quartzProperties.setProperty("org.quartz.jobStore.driverDelegateClass", PostgreSQLDelegate.class.getName()); + quartzProperties.setProperty("org.quartz.scheduler.jmx.export", "true"); + SchedulerFactory schedulerFactory = new StdSchedulerFactory(quartzProperties); + + Scheduler s = schedulerFactory.getScheduler(); + System.out.println("s.getSchedulerName() = " + s.getSchedulerName()); + scheduler = schedulerFactory.getScheduler("queue"); + } + + @Transactional + public void registerQueue(String name, int interval, Class klass) throws SchedulerException { + + Queue q = queueRepository.findByName(name); + + if (q == null) { + Queue queue = new Queue(name, interval); + queueRepository.save(queue); + } else { + boolean dirty = false; + if (interval != q.getInterval()) { + q.setInterval(interval); + dirty = true; + } + + if (dirty) { + queueRepository.save(q); + } + } + + JobDetailImpl jobDetail = new JobDetailImpl(); + JobKey jobKey = JobKey.jobKey("queue-" + name); + + jobDetail.setKey(jobKey); + jobDetail.setJobClass(AsyncServiceJob.class); + jobDetail.setDurability(true); + JobDataMap map = new JobDataMap(); + map.put("class", klass.getName()); + jobDetail.setJobDataMap(map); + + scheduler.addJob(jobDetail, true); + + SimpleScheduleBuilder schedule = simpleSchedule(). + repeatForever(). + withIntervalInSeconds(interval); + + SimpleTrigger trigger = TriggerBuilder.newTrigger(). + withSchedule(schedule). + withIdentity(jobKey.getName()). + forJob(jobKey). + build(); + + scheduler.scheduleJob(trigger); + } + + @Transactional(readOnly = true) + public JpaQueueRef getQueue(String name) { + TypedQuery query = entityManager.createQuery("select q from io.trygvis.model.Queue q where name = ?1", Queue.class); + query.setParameter(1, name); + List list = query.getResultList(); + System.out.println("list.size() = " + list.size()); + + if (list.size() == 0) { + throw new RollbackException("No such queue: '" + name + "'."); + } + + Queue queue = list.get(0); + + entityManager.detach(query); + + return new JpaQueueRef(queue); + } + + @Transactional + public JpaExecutionRef schedule(JpaQueueRef queue) { + return null; + } + + static class JpaQueueRef implements AsyncService.QueueRef { + public final Queue queue; + + JpaQueueRef(Queue queue) { + this.queue = queue; + } + } + + static class JpaExecutionRef implements AsyncService.ExecutionRef { + } + + public static class AsyncServiceJob implements Job { + private final Logger log = LoggerFactory.getLogger(getClass()); + + public void execute(JobExecutionContext context) throws JobExecutionException { + try { + log.info("Running"); + + SchedulerContext map = context.getScheduler().getContext(); + ApplicationContext applicationContext = (ApplicationContext) map.get("applicationContext"); + + log.info("applicationContext = {}", applicationContext); + + String className = map.getString("class"); + + log.info("className = {}", className); + + Class klass = getClass().getClassLoader().loadClass(className); + Object bean = applicationContext.getBean(klass); + + log.info("bean = {}", bean); + } catch (Exception e) { + log.warn("fail", e); + throw new JobExecutionException(e, false); + } + } + } + + public static class JpaDataSourceJobStore extends JobStoreSupport { + + public JpaDataSourceJobStore() { + setDataSource("wat"); + } + + protected Connection getNonManagedTXConnection() throws JobPersistenceException { +// DefaultTransactionDefinition definition = new DefaultTransactionDefinition(); +// definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); +// TransactionStatus transaction = transactionManagerStatic.getTransaction(definition); + + System.out.println("dataSourceStatic = " + dataSourceStatic); + Connection c = DataSourceUtils.getConnection(dataSourceStatic); + try { + c.setAutoCommit(false); + } catch (SQLException e) { + throw new RuntimeException(e); + } + System.out.println("c = " + c); + + return c; + } + + protected void closeConnection(Connection c) { + try { + DataSourceUtils.doCloseConnection(c, dataSourceStatic); + } catch (SQLException e) { + e.printStackTrace(); + } + + } + + protected Object executeInLock(String lockName, TransactionCallback txCallback) throws JobPersistenceException { + return executeInNonManagedTXLock(lockName, txCallback); + } + } +} -- cgit v1.2.3