From 8477e8a888d584cf1352a4b69d7cefb2b7cd9ace Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sun, 14 Apr 2013 18:04:50 +0200 Subject: o Dropping quartz, using straight executor service. --- .../java/io/trygvis/queue/JpaAsyncService.java | 306 ++++++++++----------- 1 file changed, 138 insertions(+), 168 deletions(-) (limited to 'src/main/java/io/trygvis/queue/JpaAsyncService.java') diff --git a/src/main/java/io/trygvis/queue/JpaAsyncService.java b/src/main/java/io/trygvis/queue/JpaAsyncService.java index e715c6d..95d5ef3 100755 --- a/src/main/java/io/trygvis/queue/JpaAsyncService.java +++ b/src/main/java/io/trygvis/queue/JpaAsyncService.java @@ -1,61 +1,36 @@ package io.trygvis.queue; -import io.trygvis.data.QueueRepository; -import io.trygvis.data.TaskRepository; +import io.trygvis.data.*; import io.trygvis.model.Queue; -import org.quartz.Job; -import org.quartz.JobDataMap; -import org.quartz.JobExecutionContext; -import org.quartz.JobExecutionException; -import org.quartz.JobKey; -import org.quartz.JobPersistenceException; -import org.quartz.Scheduler; -import org.quartz.SchedulerException; -import org.quartz.SchedulerFactory; -import org.quartz.SimpleScheduleBuilder; -import org.quartz.SimpleTrigger; -import org.quartz.TriggerBuilder; -import org.quartz.impl.JobDetailImpl; -import org.quartz.impl.StdSchedulerFactory; -import org.quartz.impl.jdbcjobstore.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.BeansException; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.ApplicationContext; -import org.springframework.context.ApplicationContextAware; -import org.springframework.jdbc.datasource.DataSourceUtils; -import org.springframework.stereotype.Component; -import org.springframework.transaction.PlatformTransactionManager; -import org.springframework.transaction.annotation.Transactional; - -import javax.annotation.*; -import javax.persistence.EntityManager; -import javax.persistence.PersistenceContext; -import javax.persistence.RollbackException; -import javax.persistence.TypedQuery; -import javax.sql.DataSource; -import java.sql.Connection; -import java.sql.SQLException; +import io.trygvis.model.*; +import org.quartz.*; +import org.slf4j.*; +import org.springframework.beans.factory.annotation.*; +import org.springframework.stereotype.*; +import org.springframework.transaction.*; +import org.springframework.transaction.annotation.*; +import org.springframework.transaction.support.*; + +import javax.persistence.*; import java.util.*; +import java.util.concurrent.*; -import static org.quartz.SimpleScheduleBuilder.simpleSchedule; +import static java.util.concurrent.TimeUnit.*; +import static org.springframework.transaction.support.TransactionSynchronizationManager.*; @SuppressWarnings("SpringJavaAutowiringInspection") @Component -public class JpaAsyncService implements AsyncService, - ApplicationContextAware { +public class JpaAsyncService implements AsyncService/*, ApplicationContextAware*/ { private final Logger log = LoggerFactory.getLogger(getClass()); + private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); + @PersistenceContext private EntityManager entityManager; @Autowired - private DataSource dataSource; - - @Autowired - private PlatformTransactionManager transactionManager; + private TransactionTemplate transactionTemplate; @Autowired private QueueRepository queueRepository; @@ -63,59 +38,17 @@ public class JpaAsyncService implements AsyncService callables = new ArrayList<>(); - } - - private final Context context = new Context(); - - public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { - this.context.applicationContext = applicationContext; - } - - @PostConstruct - public void postConstruct() throws Exception { - transactionManagerStatic = transactionManager; - dataSourceStatic = dataSource; - log.info("afterPropertiesSet!!"); - Properties quartzProperties = new Properties(); - quartzProperties.setProperty(StdSchedulerFactory.PROP_JOB_STORE_CLASS, JobStoreTX.class.getName()); - quartzProperties.setProperty(StdSchedulerFactory.PROP_SCHED_INSTANCE_NAME, "queue"); - quartzProperties.setProperty(StdSchedulerFactory.PROP_SCHED_SKIP_UPDATE_CHECK, "true"); - quartzProperties.setProperty(StdSchedulerFactory.PROP_SCHED_SKIP_UPDATE_CHECK, "true"); -// quartzProperties.setProperty(StdSchedulerFactory.PROP_DATASOURCE_PREFIX, "wat"); - quartzProperties.setProperty(StdSchedulerFactory.PROP_JOB_STORE_CLASS, JpaDataSourceJobStore.class.getName()); - quartzProperties.setProperty(StdSchedulerFactory.PROP_JOB_STORE_LOCK_HANDLER_CLASS, SimpleSemaphore.class.getName()); - quartzProperties.setProperty("org.quartz.threadPool.threadCount", "10"); - quartzProperties.setProperty("org.quartz.jobStore.driverDelegateClass", PostgreSQLDelegate.class.getName()); - quartzProperties.setProperty("org.quartz.scheduler.jmx.export", "true"); - SchedulerFactory schedulerFactory = new StdSchedulerFactory(quartzProperties); - - scheduler = schedulerFactory.getScheduler(); - scheduler.getContext().put("context", context); - scheduler.start(); - } - - @PreDestroy - public void preDestroy() throws Exception { - scheduler.shutdown(); - } - @Transactional - public synchronized void registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException { + public synchronized JpaQueueRef registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException { + log.info("registerQueue: ENTER"); Queue q = queueRepository.findByName(name); + log.info("q = {}", q); + if (q == null) { - Queue queue = new Queue(name, interval); - queueRepository.save(queue); + q = new Queue(name, interval); + q = queueRepository.save(q); } else { boolean dirty = false; if (interval != q.getInterval()) { @@ -124,133 +57,170 @@ public class JpaAsyncService implements AsyncService query = entityManager.createQuery("select q from io.trygvis.model.Queue q where name = ?1", Queue.class); - query.setParameter(1, name); - List list = query.getResultList(); - System.out.println("list.size() = " + list.size()); + Queue queue = queueRepository.findByName(name); - if (list.size() == 0) { + if (queue == null) { throw new RollbackException("No such queue: '" + name + "'."); } - Queue queue = list.get(0); - - entityManager.detach(query); + entityManager.detach(queue); return new JpaQueueRef(queue); } @Transactional - public JpaExecutionRef schedule(JpaQueueRef queue) { - return null; + public JpaExecutionRef schedule(JpaQueueRef queue, String... args) { + log.info("schedule: ENTER"); + Date scheduled = new Date(); + Task task = new Task(queue.queue, scheduled, args); + log.info("task = {}", task); + taskRepository.save(task); + log.info("task = {}", task); +// entityManager.detach(task); + log.info("schedule: LEAVE"); + return new JpaExecutionRef(task); } - static class JpaQueueRef implements AsyncService.QueueRef { + public static class JpaQueueRef implements AsyncService.QueueRef { public final Queue queue; JpaQueueRef(Queue queue) { this.queue = queue; } - } - static class JpaExecutionRef implements AsyncService.ExecutionRef { + public String toString() { + return "JpaQueueRef{" + + "queue=" + queue + + '}'; + } } - public static class AsyncServiceJob implements Job { - private final Logger log = LoggerFactory.getLogger(getClass()); + public static class JpaExecutionRef implements AsyncService.ExecutionRef { + private final Task task; - public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { - try { - log.info("Running"); + public JpaExecutionRef(Task task) { + this.task = task; + } - Context context = (Context) jobExecutionContext.getScheduler().getContext().get("context"); + public String toString() { + return "JpaExecutionRef{" + + "task=" + task + + '}'; + } + } - for (Map.Entry entry : jobExecutionContext.getMergedJobDataMap().entrySet()) { - log.info("{} = {}, class= {}", entry.getKey(), entry.getValue(), entry.getValue() != null ? entry.getValue().getClass() : ""); - } + private class CheckTimerTask implements Runnable { + private final AsyncCallable callable; + private final JpaQueueRef queueRef; + + private CheckTimerTask(AsyncCallable callable, JpaQueueRef queueRef) { + this.callable = callable; + this.queueRef = queueRef; + } + + public void run() { + log.info("JpaAsyncService$CheckTimerTask.run"); - int index = jobExecutionContext.getMergedJobDataMap().getIntFromString("index"); - AsyncCallable callable = context.callables.get(index); + List tasks = taskRepository.findByQueueAndCompletedIsNull(queueRef.queue); - log.info("Calling {}", callable); - callable.run(); + System.out.println("tasks.size() = " + tasks.size()); - } catch (Throwable e) { - log.warn("fail", e); -// throw new JobExecutionException(e, false); + try { + for (final Task task : tasks) { + try { + executeTask(task); + } catch (TransactionException e) { + log.warn("Task execution failed", e); + } + } + } catch (Exception e) { + log.warn("Error while execution tasks.", e); } } + + private void executeTask(final Task task) { + final Date run = new Date(); + log.info("Setting last run on task. date = {}, task = {}", run, task); + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + protected void doInTransactionWithoutResult(TransactionStatus status) { + task.registerRun(); + taskRepository.save(task); + } + }); + + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + protected void doInTransactionWithoutResult(TransactionStatus status) { + try { + callable.run(); + Date completed = new Date(); + log.info("Setting completed on task. date = {}, task = {}", completed, task); + task.registerComplete(completed); + taskRepository.save(task); + } catch (Exception e) { + throw new RuntimeException("Error while executing callback", e); + } + } + }); + } } - public static class JpaDataSourceJobStore extends JobStoreSupport { + private class MyTransactionSynchronization implements TransactionSynchronization { + + private final AsyncCallable callable; - public JpaDataSourceJobStore() { - setDataSource("wat"); + private final int interval; + + private final JpaQueueRef queueRef; + + public MyTransactionSynchronization(AsyncCallable callable, int interval, JpaQueueRef queueRef) { + this.callable = callable; + this.interval = interval; + this.queueRef = queueRef; } - protected Connection getNonManagedTXConnection() throws JobPersistenceException { -// DefaultTransactionDefinition definition = new DefaultTransactionDefinition(); -// definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); -// TransactionStatus transaction = transactionManagerStatic.getTransaction(definition); + public void suspend() { + } -// System.out.println("dataSourceStatic = " + dataSourceStatic); - Connection c = DataSourceUtils.getConnection(dataSourceStatic); - try { - c.setAutoCommit(false); - } catch (SQLException e) { - throw new RuntimeException(e); - } -// System.out.println("c = " + c); + public void resume() { + } - return c; + public void flush() { } - protected void closeConnection(Connection c) { - try { - DataSourceUtils.doCloseConnection(c, dataSourceStatic); - } catch (SQLException e) { - e.printStackTrace(); - } + public void beforeCommit(boolean readOnly) { + } + public void beforeCompletion() { } - protected Object executeInLock(String lockName, TransactionCallback txCallback) throws JobPersistenceException { - return executeInNonManagedTXLock(lockName, txCallback); + public void afterCommit() { + } + + public void afterCompletion(int status) { + log.info("status = {}", status); + if (status == TransactionSynchronization.STATUS_COMMITTED) { + executor.scheduleAtFixedRate(new CheckTimerTask(callable, queueRef), 1000, 1000 * interval, MILLISECONDS); + } } } } -- cgit v1.2.3