aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/queue/JpaAsyncService.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/queue/JpaAsyncService.java')
-rwxr-xr-xsrc/main/java/io/trygvis/queue/JpaAsyncService.java246
1 files changed, 246 insertions, 0 deletions
diff --git a/src/main/java/io/trygvis/queue/JpaAsyncService.java b/src/main/java/io/trygvis/queue/JpaAsyncService.java
new file mode 100755
index 0000000..2d6c2df
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/JpaAsyncService.java
@@ -0,0 +1,246 @@
+package io.trygvis.queue;
+
+import io.trygvis.data.QueueRepository;
+import io.trygvis.data.TaskRepository;
+import io.trygvis.model.Queue;
+import org.quartz.Job;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.JobKey;
+import org.quartz.JobPersistenceException;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerContext;
+import org.quartz.SchedulerException;
+import org.quartz.SchedulerFactory;
+import org.quartz.SimpleScheduleBuilder;
+import org.quartz.SimpleTrigger;
+import org.quartz.TriggerBuilder;
+import org.quartz.impl.DirectSchedulerFactory;
+import org.quartz.impl.JobDetailImpl;
+import org.quartz.impl.StdSchedulerFactory;
+import org.quartz.impl.jdbcjobstore.JobStoreSupport;
+import org.quartz.impl.jdbcjobstore.JobStoreTX;
+import org.quartz.impl.jdbcjobstore.PostgreSQLDelegate;
+import org.quartz.spi.JobStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.jdbc.datasource.DataSourceUtils;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.PlatformTransactionManager;
+import org.springframework.transaction.TransactionDefinition;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.annotation.Transactional;
+import org.springframework.transaction.interceptor.RuleBasedTransactionAttribute;
+import org.springframework.transaction.support.DefaultTransactionDefinition;
+
+import javax.annotation.PostConstruct;
+import javax.persistence.EntityManager;
+import javax.persistence.PersistenceContext;
+import javax.persistence.RollbackException;
+import javax.persistence.TypedQuery;
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Properties;
+
+import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
+
+@SuppressWarnings("SpringJavaAutowiringInspection")
+@Component
+public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef, JpaAsyncService.JpaExecutionRef> {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ @PersistenceContext
+ private EntityManager entityManager;
+
+ @Autowired
+ private DataSource dataSource;
+
+ private static DataSource dataSourceStatic;
+
+ @Autowired
+ private PlatformTransactionManager transactionManager;
+
+ private static PlatformTransactionManager transactionManagerStatic;
+
+ @Autowired
+ private QueueRepository queueRepository;
+
+ @Autowired
+ private TaskRepository taskRepository;
+
+ private Scheduler scheduler;
+
+ @PostConstruct
+ public void afterPropertiesSet() throws Exception {
+ transactionManagerStatic = transactionManager;
+ dataSourceStatic = dataSource;
+ log.info("afterPropertiesSet!!");
+ Properties quartzProperties = new Properties();
+ quartzProperties.setProperty(StdSchedulerFactory.PROP_JOB_STORE_CLASS, JobStoreTX.class.getName());
+ quartzProperties.setProperty(StdSchedulerFactory.PROP_SCHED_INSTANCE_NAME, "queue");
+ quartzProperties.setProperty(StdSchedulerFactory.PROP_SCHED_SKIP_UPDATE_CHECK, "true");
+ quartzProperties.setProperty(StdSchedulerFactory.PROP_SCHED_SKIP_UPDATE_CHECK, "true");
+// quartzProperties.setProperty(StdSchedulerFactory.PROP_DATASOURCE_PREFIX, "wat");
+ quartzProperties.setProperty(StdSchedulerFactory.PROP_JOB_STORE_CLASS, JpaDataSourceJobStore.class.getName());
+ quartzProperties.setProperty("org.quartz.threadPool.threadCount", "10");
+ quartzProperties.setProperty("org.quartz.jobStore.driverDelegateClass", PostgreSQLDelegate.class.getName());
+ quartzProperties.setProperty("org.quartz.scheduler.jmx.export", "true");
+ SchedulerFactory schedulerFactory = new StdSchedulerFactory(quartzProperties);
+
+ Scheduler s = schedulerFactory.getScheduler();
+ System.out.println("s.getSchedulerName() = " + s.getSchedulerName());
+ scheduler = schedulerFactory.getScheduler("queue");
+ }
+
+ @Transactional
+ public void registerQueue(String name, int interval, Class klass) throws SchedulerException {
+
+ Queue q = queueRepository.findByName(name);
+
+ if (q == null) {
+ Queue queue = new Queue(name, interval);
+ queueRepository.save(queue);
+ } else {
+ boolean dirty = false;
+ if (interval != q.getInterval()) {
+ q.setInterval(interval);
+ dirty = true;
+ }
+
+ if (dirty) {
+ queueRepository.save(q);
+ }
+ }
+
+ JobDetailImpl jobDetail = new JobDetailImpl();
+ JobKey jobKey = JobKey.jobKey("queue-" + name);
+
+ jobDetail.setKey(jobKey);
+ jobDetail.setJobClass(AsyncServiceJob.class);
+ jobDetail.setDurability(true);
+ JobDataMap map = new JobDataMap();
+ map.put("class", klass.getName());
+ jobDetail.setJobDataMap(map);
+
+ scheduler.addJob(jobDetail, true);
+
+ SimpleScheduleBuilder schedule = simpleSchedule().
+ repeatForever().
+ withIntervalInSeconds(interval);
+
+ SimpleTrigger trigger = TriggerBuilder.newTrigger().
+ withSchedule(schedule).
+ withIdentity(jobKey.getName()).
+ forJob(jobKey).
+ build();
+
+ scheduler.scheduleJob(trigger);
+ }
+
+ @Transactional(readOnly = true)
+ public JpaQueueRef getQueue(String name) {
+ TypedQuery<Queue> query = entityManager.createQuery("select q from io.trygvis.model.Queue q where name = ?1", Queue.class);
+ query.setParameter(1, name);
+ List<Queue> list = query.getResultList();
+ System.out.println("list.size() = " + list.size());
+
+ if (list.size() == 0) {
+ throw new RollbackException("No such queue: '" + name + "'.");
+ }
+
+ Queue queue = list.get(0);
+
+ entityManager.detach(query);
+
+ return new JpaQueueRef(queue);
+ }
+
+ @Transactional
+ public JpaExecutionRef schedule(JpaQueueRef queue) {
+ return null;
+ }
+
+ static class JpaQueueRef implements AsyncService.QueueRef {
+ public final Queue queue;
+
+ JpaQueueRef(Queue queue) {
+ this.queue = queue;
+ }
+ }
+
+ static class JpaExecutionRef implements AsyncService.ExecutionRef {
+ }
+
+ public static class AsyncServiceJob implements Job {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ public void execute(JobExecutionContext context) throws JobExecutionException {
+ try {
+ log.info("Running");
+
+ SchedulerContext map = context.getScheduler().getContext();
+ ApplicationContext applicationContext = (ApplicationContext) map.get("applicationContext");
+
+ log.info("applicationContext = {}", applicationContext);
+
+ String className = map.getString("class");
+
+ log.info("className = {}", className);
+
+ Class klass = getClass().getClassLoader().loadClass(className);
+ Object bean = applicationContext.getBean(klass);
+
+ log.info("bean = {}", bean);
+ } catch (Exception e) {
+ log.warn("fail", e);
+ throw new JobExecutionException(e, false);
+ }
+ }
+ }
+
+ public static class JpaDataSourceJobStore extends JobStoreSupport {
+
+ public JpaDataSourceJobStore() {
+ setDataSource("wat");
+ }
+
+ protected Connection getNonManagedTXConnection() throws JobPersistenceException {
+// DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
+// definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
+// TransactionStatus transaction = transactionManagerStatic.getTransaction(definition);
+
+ System.out.println("dataSourceStatic = " + dataSourceStatic);
+ Connection c = DataSourceUtils.getConnection(dataSourceStatic);
+ try {
+ c.setAutoCommit(false);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ System.out.println("c = " + c);
+
+ return c;
+ }
+
+ protected void closeConnection(Connection c) {
+ try {
+ DataSourceUtils.doCloseConnection(c, dataSourceStatic);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ protected Object executeInLock(String lockName, TransactionCallback txCallback) throws JobPersistenceException {
+ return executeInNonManagedTXLock(lockName, txCallback);
+ }
+ }
+}