aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/queue/JpaAsyncService.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/queue/JpaAsyncService.java')
-rwxr-xr-xsrc/main/java/io/trygvis/queue/JpaAsyncService.java94
1 files changed, 52 insertions, 42 deletions
diff --git a/src/main/java/io/trygvis/queue/JpaAsyncService.java b/src/main/java/io/trygvis/queue/JpaAsyncService.java
index 2d6c2df..e715c6d 100755
--- a/src/main/java/io/trygvis/queue/JpaAsyncService.java
+++ b/src/main/java/io/trygvis/queue/JpaAsyncService.java
@@ -10,36 +10,26 @@ import org.quartz.JobExecutionException;
import org.quartz.JobKey;
import org.quartz.JobPersistenceException;
import org.quartz.Scheduler;
-import org.quartz.SchedulerContext;
import org.quartz.SchedulerException;
import org.quartz.SchedulerFactory;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.SimpleTrigger;
import org.quartz.TriggerBuilder;
-import org.quartz.impl.DirectSchedulerFactory;
import org.quartz.impl.JobDetailImpl;
import org.quartz.impl.StdSchedulerFactory;
-import org.quartz.impl.jdbcjobstore.JobStoreSupport;
-import org.quartz.impl.jdbcjobstore.JobStoreTX;
-import org.quartz.impl.jdbcjobstore.PostgreSQLDelegate;
-import org.quartz.spi.JobStore;
+import org.quartz.impl.jdbcjobstore.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
-import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.jdbc.datasource.DataSourceUtils;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
-import org.springframework.transaction.TransactionDefinition;
-import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional;
-import org.springframework.transaction.interceptor.RuleBasedTransactionAttribute;
-import org.springframework.transaction.support.DefaultTransactionDefinition;
-import javax.annotation.PostConstruct;
+import javax.annotation.*;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.persistence.RollbackException;
@@ -47,14 +37,14 @@ import javax.persistence.TypedQuery;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
-import java.util.List;
-import java.util.Properties;
+import java.util.*;
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
@SuppressWarnings("SpringJavaAutowiringInspection")
@Component
-public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef, JpaAsyncService.JpaExecutionRef> {
+public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef, JpaAsyncService.JpaExecutionRef>,
+ ApplicationContextAware {
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -64,13 +54,9 @@ public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef
@Autowired
private DataSource dataSource;
- private static DataSource dataSourceStatic;
-
@Autowired
private PlatformTransactionManager transactionManager;
- private static PlatformTransactionManager transactionManagerStatic;
-
@Autowired
private QueueRepository queueRepository;
@@ -79,8 +65,23 @@ public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef
private Scheduler scheduler;
+ private static DataSource dataSourceStatic;
+
+ private static PlatformTransactionManager transactionManagerStatic;
+
+ private static class Context {
+ ApplicationContext applicationContext;
+ List<AsyncCallable> callables = new ArrayList<>();
+ }
+
+ private final Context context = new Context();
+
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+ this.context.applicationContext = applicationContext;
+ }
+
@PostConstruct
- public void afterPropertiesSet() throws Exception {
+ public void postConstruct() throws Exception {
transactionManagerStatic = transactionManager;
dataSourceStatic = dataSource;
log.info("afterPropertiesSet!!");
@@ -91,18 +92,24 @@ public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef
quartzProperties.setProperty(StdSchedulerFactory.PROP_SCHED_SKIP_UPDATE_CHECK, "true");
// quartzProperties.setProperty(StdSchedulerFactory.PROP_DATASOURCE_PREFIX, "wat");
quartzProperties.setProperty(StdSchedulerFactory.PROP_JOB_STORE_CLASS, JpaDataSourceJobStore.class.getName());
+ quartzProperties.setProperty(StdSchedulerFactory.PROP_JOB_STORE_LOCK_HANDLER_CLASS, SimpleSemaphore.class.getName());
quartzProperties.setProperty("org.quartz.threadPool.threadCount", "10");
quartzProperties.setProperty("org.quartz.jobStore.driverDelegateClass", PostgreSQLDelegate.class.getName());
quartzProperties.setProperty("org.quartz.scheduler.jmx.export", "true");
SchedulerFactory schedulerFactory = new StdSchedulerFactory(quartzProperties);
- Scheduler s = schedulerFactory.getScheduler();
- System.out.println("s.getSchedulerName() = " + s.getSchedulerName());
- scheduler = schedulerFactory.getScheduler("queue");
+ scheduler = schedulerFactory.getScheduler();
+ scheduler.getContext().put("context", context);
+ scheduler.start();
+ }
+
+ @PreDestroy
+ public void preDestroy() throws Exception {
+ scheduler.shutdown();
}
@Transactional
- public void registerQueue(String name, int interval, Class klass) throws SchedulerException {
+ public synchronized void registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException {
Queue q = queueRepository.findByName(name);
@@ -121,16 +128,16 @@ public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef
}
}
+ context.callables.add(callable);
+ int index = context.callables.size() - 1;
+
JobDetailImpl jobDetail = new JobDetailImpl();
JobKey jobKey = JobKey.jobKey("queue-" + name);
jobDetail.setKey(jobKey);
jobDetail.setJobClass(AsyncServiceJob.class);
jobDetail.setDurability(true);
- JobDataMap map = new JobDataMap();
- map.put("class", klass.getName());
- jobDetail.setJobDataMap(map);
-
+ jobDetail.getJobDataMap().put("index", Integer.toString(index));
scheduler.addJob(jobDetail, true);
SimpleScheduleBuilder schedule = simpleSchedule().
@@ -143,6 +150,10 @@ public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef
forJob(jobKey).
build();
+ if(scheduler.checkExists(trigger.getKey())) {
+ scheduler.unscheduleJob(trigger.getKey());
+ }
+
scheduler.scheduleJob(trigger);
}
@@ -183,26 +194,25 @@ public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef
public static class AsyncServiceJob implements Job {
private final Logger log = LoggerFactory.getLogger(getClass());
- public void execute(JobExecutionContext context) throws JobExecutionException {
+ public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
try {
log.info("Running");
- SchedulerContext map = context.getScheduler().getContext();
- ApplicationContext applicationContext = (ApplicationContext) map.get("applicationContext");
-
- log.info("applicationContext = {}", applicationContext);
+ Context context = (Context) jobExecutionContext.getScheduler().getContext().get("context");
- String className = map.getString("class");
+ for (Map.Entry<String, Object> entry : jobExecutionContext.getMergedJobDataMap().entrySet()) {
+ log.info("{} = {}, class= {}", entry.getKey(), entry.getValue(), entry.getValue() != null ? entry.getValue().getClass() : "<null>");
+ }
- log.info("className = {}", className);
+ int index = jobExecutionContext.getMergedJobDataMap().getIntFromString("index");
+ AsyncCallable callable = context.callables.get(index);
- Class klass = getClass().getClassLoader().loadClass(className);
- Object bean = applicationContext.getBean(klass);
+ log.info("Calling {}", callable);
+ callable.run();
- log.info("bean = {}", bean);
- } catch (Exception e) {
+ } catch (Throwable e) {
log.warn("fail", e);
- throw new JobExecutionException(e, false);
+// throw new JobExecutionException(e, false);
}
}
}
@@ -218,14 +228,14 @@ public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef
// definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
// TransactionStatus transaction = transactionManagerStatic.getTransaction(definition);
- System.out.println("dataSourceStatic = " + dataSourceStatic);
+// System.out.println("dataSourceStatic = " + dataSourceStatic);
Connection c = DataSourceUtils.getConnection(dataSourceStatic);
try {
c.setAutoCommit(false);
} catch (SQLException e) {
throw new RuntimeException(e);
}
- System.out.println("c = " + c);
+// System.out.println("c = " + c);
return c;
}