aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/queue/JpaAsyncService.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/queue/JpaAsyncService.java')
-rwxr-xr-xsrc/main/java/io/trygvis/queue/JpaAsyncService.java306
1 files changed, 138 insertions, 168 deletions
diff --git a/src/main/java/io/trygvis/queue/JpaAsyncService.java b/src/main/java/io/trygvis/queue/JpaAsyncService.java
index e715c6d..95d5ef3 100755
--- a/src/main/java/io/trygvis/queue/JpaAsyncService.java
+++ b/src/main/java/io/trygvis/queue/JpaAsyncService.java
@@ -1,61 +1,36 @@
package io.trygvis.queue;
-import io.trygvis.data.QueueRepository;
-import io.trygvis.data.TaskRepository;
+import io.trygvis.data.*;
import io.trygvis.model.Queue;
-import org.quartz.Job;
-import org.quartz.JobDataMap;
-import org.quartz.JobExecutionContext;
-import org.quartz.JobExecutionException;
-import org.quartz.JobKey;
-import org.quartz.JobPersistenceException;
-import org.quartz.Scheduler;
-import org.quartz.SchedulerException;
-import org.quartz.SchedulerFactory;
-import org.quartz.SimpleScheduleBuilder;
-import org.quartz.SimpleTrigger;
-import org.quartz.TriggerBuilder;
-import org.quartz.impl.JobDetailImpl;
-import org.quartz.impl.StdSchedulerFactory;
-import org.quartz.impl.jdbcjobstore.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.BeansException;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.ApplicationContextAware;
-import org.springframework.jdbc.datasource.DataSourceUtils;
-import org.springframework.stereotype.Component;
-import org.springframework.transaction.PlatformTransactionManager;
-import org.springframework.transaction.annotation.Transactional;
-
-import javax.annotation.*;
-import javax.persistence.EntityManager;
-import javax.persistence.PersistenceContext;
-import javax.persistence.RollbackException;
-import javax.persistence.TypedQuery;
-import javax.sql.DataSource;
-import java.sql.Connection;
-import java.sql.SQLException;
+import io.trygvis.model.*;
+import org.quartz.*;
+import org.slf4j.*;
+import org.springframework.beans.factory.annotation.*;
+import org.springframework.stereotype.*;
+import org.springframework.transaction.*;
+import org.springframework.transaction.annotation.*;
+import org.springframework.transaction.support.*;
+
+import javax.persistence.*;
import java.util.*;
+import java.util.concurrent.*;
-import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
+import static java.util.concurrent.TimeUnit.*;
+import static org.springframework.transaction.support.TransactionSynchronizationManager.*;
@SuppressWarnings("SpringJavaAutowiringInspection")
@Component
-public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef, JpaAsyncService.JpaExecutionRef>,
- ApplicationContextAware {
+public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef, JpaAsyncService.JpaExecutionRef>/*, ApplicationContextAware*/ {
private final Logger log = LoggerFactory.getLogger(getClass());
+ private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory());
+
@PersistenceContext
private EntityManager entityManager;
@Autowired
- private DataSource dataSource;
-
- @Autowired
- private PlatformTransactionManager transactionManager;
+ private TransactionTemplate transactionTemplate;
@Autowired
private QueueRepository queueRepository;
@@ -63,59 +38,17 @@ public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef
@Autowired
private TaskRepository taskRepository;
- private Scheduler scheduler;
-
- private static DataSource dataSourceStatic;
-
- private static PlatformTransactionManager transactionManagerStatic;
-
- private static class Context {
- ApplicationContext applicationContext;
- List<AsyncCallable> callables = new ArrayList<>();
- }
-
- private final Context context = new Context();
-
- public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- this.context.applicationContext = applicationContext;
- }
-
- @PostConstruct
- public void postConstruct() throws Exception {
- transactionManagerStatic = transactionManager;
- dataSourceStatic = dataSource;
- log.info("afterPropertiesSet!!");
- Properties quartzProperties = new Properties();
- quartzProperties.setProperty(StdSchedulerFactory.PROP_JOB_STORE_CLASS, JobStoreTX.class.getName());
- quartzProperties.setProperty(StdSchedulerFactory.PROP_SCHED_INSTANCE_NAME, "queue");
- quartzProperties.setProperty(StdSchedulerFactory.PROP_SCHED_SKIP_UPDATE_CHECK, "true");
- quartzProperties.setProperty(StdSchedulerFactory.PROP_SCHED_SKIP_UPDATE_CHECK, "true");
-// quartzProperties.setProperty(StdSchedulerFactory.PROP_DATASOURCE_PREFIX, "wat");
- quartzProperties.setProperty(StdSchedulerFactory.PROP_JOB_STORE_CLASS, JpaDataSourceJobStore.class.getName());
- quartzProperties.setProperty(StdSchedulerFactory.PROP_JOB_STORE_LOCK_HANDLER_CLASS, SimpleSemaphore.class.getName());
- quartzProperties.setProperty("org.quartz.threadPool.threadCount", "10");
- quartzProperties.setProperty("org.quartz.jobStore.driverDelegateClass", PostgreSQLDelegate.class.getName());
- quartzProperties.setProperty("org.quartz.scheduler.jmx.export", "true");
- SchedulerFactory schedulerFactory = new StdSchedulerFactory(quartzProperties);
-
- scheduler = schedulerFactory.getScheduler();
- scheduler.getContext().put("context", context);
- scheduler.start();
- }
-
- @PreDestroy
- public void preDestroy() throws Exception {
- scheduler.shutdown();
- }
-
@Transactional
- public synchronized void registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException {
+ public synchronized JpaQueueRef registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException {
+ log.info("registerQueue: ENTER");
Queue q = queueRepository.findByName(name);
+ log.info("q = {}", q);
+
if (q == null) {
- Queue queue = new Queue(name, interval);
- queueRepository.save(queue);
+ q = new Queue(name, interval);
+ q = queueRepository.save(q);
} else {
boolean dirty = false;
if (interval != q.getInterval()) {
@@ -124,133 +57,170 @@ public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef
}
if (dirty) {
- queueRepository.save(q);
+ q = queueRepository.save(q);
}
}
- context.callables.add(callable);
- int index = context.callables.size() - 1;
+ log.info("q = {}", q);
+ entityManager.flush();
+// entityManager.detach(q);
+ log.info("q = {}", q);
- JobDetailImpl jobDetail = new JobDetailImpl();
- JobKey jobKey = JobKey.jobKey("queue-" + name);
+ JpaQueueRef queueRef = new JpaQueueRef(q);
- jobDetail.setKey(jobKey);
- jobDetail.setJobClass(AsyncServiceJob.class);
- jobDetail.setDurability(true);
- jobDetail.getJobDataMap().put("index", Integer.toString(index));
- scheduler.addJob(jobDetail, true);
+ log.info("registerQueue: LEAVE");
- SimpleScheduleBuilder schedule = simpleSchedule().
- repeatForever().
- withIntervalInSeconds(interval);
-
- SimpleTrigger trigger = TriggerBuilder.newTrigger().
- withSchedule(schedule).
- withIdentity(jobKey.getName()).
- forJob(jobKey).
- build();
-
- if(scheduler.checkExists(trigger.getKey())) {
- scheduler.unscheduleJob(trigger.getKey());
- }
+ registerSynchronization(new MyTransactionSynchronization(callable, interval, queueRef));
- scheduler.scheduleJob(trigger);
+ return queueRef;
}
@Transactional(readOnly = true)
public JpaQueueRef getQueue(String name) {
- TypedQuery<Queue> query = entityManager.createQuery("select q from io.trygvis.model.Queue q where name = ?1", Queue.class);
- query.setParameter(1, name);
- List<Queue> list = query.getResultList();
- System.out.println("list.size() = " + list.size());
+ Queue queue = queueRepository.findByName(name);
- if (list.size() == 0) {
+ if (queue == null) {
throw new RollbackException("No such queue: '" + name + "'.");
}
- Queue queue = list.get(0);
-
- entityManager.detach(query);
+ entityManager.detach(queue);
return new JpaQueueRef(queue);
}
@Transactional
- public JpaExecutionRef schedule(JpaQueueRef queue) {
- return null;
+ public JpaExecutionRef schedule(JpaQueueRef queue, String... args) {
+ log.info("schedule: ENTER");
+ Date scheduled = new Date();
+ Task task = new Task(queue.queue, scheduled, args);
+ log.info("task = {}", task);
+ taskRepository.save(task);
+ log.info("task = {}", task);
+// entityManager.detach(task);
+ log.info("schedule: LEAVE");
+ return new JpaExecutionRef(task);
}
- static class JpaQueueRef implements AsyncService.QueueRef {
+ public static class JpaQueueRef implements AsyncService.QueueRef {
public final Queue queue;
JpaQueueRef(Queue queue) {
this.queue = queue;
}
- }
- static class JpaExecutionRef implements AsyncService.ExecutionRef {
+ public String toString() {
+ return "JpaQueueRef{" +
+ "queue=" + queue +
+ '}';
+ }
}
- public static class AsyncServiceJob implements Job {
- private final Logger log = LoggerFactory.getLogger(getClass());
+ public static class JpaExecutionRef implements AsyncService.ExecutionRef {
+ private final Task task;
- public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
- try {
- log.info("Running");
+ public JpaExecutionRef(Task task) {
+ this.task = task;
+ }
- Context context = (Context) jobExecutionContext.getScheduler().getContext().get("context");
+ public String toString() {
+ return "JpaExecutionRef{" +
+ "task=" + task +
+ '}';
+ }
+ }
- for (Map.Entry<String, Object> entry : jobExecutionContext.getMergedJobDataMap().entrySet()) {
- log.info("{} = {}, class= {}", entry.getKey(), entry.getValue(), entry.getValue() != null ? entry.getValue().getClass() : "<null>");
- }
+ private class CheckTimerTask implements Runnable {
+ private final AsyncCallable callable;
+ private final JpaQueueRef queueRef;
+
+ private CheckTimerTask(AsyncCallable callable, JpaQueueRef queueRef) {
+ this.callable = callable;
+ this.queueRef = queueRef;
+ }
+
+ public void run() {
+ log.info("JpaAsyncService$CheckTimerTask.run");
- int index = jobExecutionContext.getMergedJobDataMap().getIntFromString("index");
- AsyncCallable callable = context.callables.get(index);
+ List<Task> tasks = taskRepository.findByQueueAndCompletedIsNull(queueRef.queue);
- log.info("Calling {}", callable);
- callable.run();
+ System.out.println("tasks.size() = " + tasks.size());
- } catch (Throwable e) {
- log.warn("fail", e);
-// throw new JobExecutionException(e, false);
+ try {
+ for (final Task task : tasks) {
+ try {
+ executeTask(task);
+ } catch (TransactionException e) {
+ log.warn("Task execution failed", e);
+ }
+ }
+ } catch (Exception e) {
+ log.warn("Error while execution tasks.", e);
}
}
+
+ private void executeTask(final Task task) {
+ final Date run = new Date();
+ log.info("Setting last run on task. date = {}, task = {}", run, task);
+ transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+ protected void doInTransactionWithoutResult(TransactionStatus status) {
+ task.registerRun();
+ taskRepository.save(task);
+ }
+ });
+
+ transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+ protected void doInTransactionWithoutResult(TransactionStatus status) {
+ try {
+ callable.run();
+ Date completed = new Date();
+ log.info("Setting completed on task. date = {}, task = {}", completed, task);
+ task.registerComplete(completed);
+ taskRepository.save(task);
+ } catch (Exception e) {
+ throw new RuntimeException("Error while executing callback", e);
+ }
+ }
+ });
+ }
}
- public static class JpaDataSourceJobStore extends JobStoreSupport {
+ private class MyTransactionSynchronization implements TransactionSynchronization {
+
+ private final AsyncCallable callable;
- public JpaDataSourceJobStore() {
- setDataSource("wat");
+ private final int interval;
+
+ private final JpaQueueRef queueRef;
+
+ public MyTransactionSynchronization(AsyncCallable callable, int interval, JpaQueueRef queueRef) {
+ this.callable = callable;
+ this.interval = interval;
+ this.queueRef = queueRef;
}
- protected Connection getNonManagedTXConnection() throws JobPersistenceException {
-// DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
-// definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
-// TransactionStatus transaction = transactionManagerStatic.getTransaction(definition);
+ public void suspend() {
+ }
-// System.out.println("dataSourceStatic = " + dataSourceStatic);
- Connection c = DataSourceUtils.getConnection(dataSourceStatic);
- try {
- c.setAutoCommit(false);
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
-// System.out.println("c = " + c);
+ public void resume() {
+ }
- return c;
+ public void flush() {
}
- protected void closeConnection(Connection c) {
- try {
- DataSourceUtils.doCloseConnection(c, dataSourceStatic);
- } catch (SQLException e) {
- e.printStackTrace();
- }
+ public void beforeCommit(boolean readOnly) {
+ }
+ public void beforeCompletion() {
}
- protected Object executeInLock(String lockName, TransactionCallback txCallback) throws JobPersistenceException {
- return executeInNonManagedTXLock(lockName, txCallback);
+ public void afterCommit() {
+ }
+
+ public void afterCompletion(int status) {
+ log.info("status = {}", status);
+ if (status == TransactionSynchronization.STATUS_COMMITTED) {
+ executor.scheduleAtFixedRate(new CheckTimerTask(callable, queueRef), 1000, 1000 * interval, MILLISECONDS);
+ }
}
}
}