aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2013-04-11 19:12:27 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2013-04-11 19:12:27 +0200
commitb59f08bb5b08be7972086037ec54b23ea9fb49f8 (patch)
treeef168ed7da2aa9fe8766a0d1625aa40e36a8ad99
parentcd99d57cccb88ea8a058eca530d62a81a665983c (diff)
downloadquartz-based-queue-b59f08bb5b08be7972086037ec54b23ea9fb49f8.tar.gz
quartz-based-queue-b59f08bb5b08be7972086037ec54b23ea9fb49f8.tar.bz2
quartz-based-queue-b59f08bb5b08be7972086037ec54b23ea9fb49f8.tar.xz
quartz-based-queue-b59f08bb5b08be7972086037ec54b23ea9fb49f8.zip
wip
-rwxr-xr-xsrc/main/java/io/trygvis/CreateArticleCallable.java33
-rwxr-xr-xsrc/main/java/io/trygvis/Main.java12
-rwxr-xr-xsrc/main/java/io/trygvis/MyJob.java38
-rwxr-xr-xsrc/main/java/io/trygvis/UpdateArticeCallable.java43
-rwxr-xr-xsrc/main/java/io/trygvis/queue/AsyncService.java6
-rwxr-xr-xsrc/main/java/io/trygvis/queue/JpaAsyncService.java94
-rwxr-xr-xsrc/main/java/io/trygvis/spring/Config.java5
-rwxr-xr-xsrc/main/resources/logback.xml1
8 files changed, 143 insertions, 89 deletions
diff --git a/src/main/java/io/trygvis/CreateArticleCallable.java b/src/main/java/io/trygvis/CreateArticleCallable.java
new file mode 100755
index 0000000..e727b46
--- /dev/null
+++ b/src/main/java/io/trygvis/CreateArticleCallable.java
@@ -0,0 +1,33 @@
+package io.trygvis;
+
+import io.trygvis.model.*;
+import io.trygvis.queue.*;
+import org.slf4j.*;
+import org.springframework.stereotype.*;
+import org.springframework.transaction.annotation.*;
+
+import java.util.*;
+import javax.persistence.*;
+
+import static org.springframework.transaction.annotation.Propagation.*;
+
+@Component
+public class CreateArticleCallable implements AsyncService.AsyncCallable {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ @PersistenceContext
+ private EntityManager entityManager;
+
+// @Transactional(propagation = REQUIRES_NEW)
+ public void run() throws Exception {
+ log.info("CreateArticeJob.run: BEGIN");
+ Date now = new Date();
+
+ log.info("now = {}", now);
+
+ Article article = new Article(new Date(), null, "title", "body");
+ entityManager.persist(article);
+
+ log.info("CreateArticeJob.run: END");
+ }
+}
diff --git a/src/main/java/io/trygvis/Main.java b/src/main/java/io/trygvis/Main.java
index 19167d7..aafb728 100755
--- a/src/main/java/io/trygvis/Main.java
+++ b/src/main/java/io/trygvis/Main.java
@@ -29,7 +29,6 @@ public class Main {
log.info("Starting context");
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
- context.start();
log.info("Started context");
try {
@@ -50,12 +49,19 @@ public class Main {
@Autowired
private AsyncService<AsyncService.QueueRef, AsyncService.ExecutionRef> asyncService;
+ @Autowired
+ private CreateArticleCallable createArticleCallable;
+
+ @Autowired
+ private UpdateArticeCallable updateArticeCallable;
+
public void run() throws Exception {
log.info("Main.run");
- asyncService.registerQueue("test-queue", 1, MyJob.class);
+ asyncService.registerQueue("create-queue", 1, createArticleCallable);
+// asyncService.registerQueue("update-queue", 1, updateArticeCallable);
- AsyncService.QueueRef queue = asyncService.getQueue("test-queue");
+ AsyncService.QueueRef queue = asyncService.getQueue("create-queue");
AsyncService.ExecutionRef executionRef = asyncService.schedule(queue);
}
diff --git a/src/main/java/io/trygvis/MyJob.java b/src/main/java/io/trygvis/MyJob.java
deleted file mode 100755
index 7303a33..0000000
--- a/src/main/java/io/trygvis/MyJob.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package io.trygvis;
-
-import io.trygvis.model.*;
-import org.quartz.*;
-import org.slf4j.*;
-
-import java.util.*;
-import javax.persistence.*;
-
-public class MyJob implements Job {
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- @PersistenceContext
- private EntityManager entityManager;
-
- public void execute(JobExecutionContext context) throws JobExecutionException {
- log.info("MyJob.execute: BEGIN");
- log.info("entityManager = {}", entityManager);
- log.info("context.getJobDetail().getKey() = {}", context.getJobDetail().getKey());
-
-/*
- Date now = new Date();
-
- log.info("now = {}", now);
-
-// Article article = entityManager.find(Article.class, 1);
-//
-// System.out.println("article = " + article);
-// article.setUpdated(now);
-// entityManager.persist(article);
-
- Article article = new Article(new Date(), null, "title", "body");
- entityManager.persist(article);
-*/
-
- log.info("MyJob.execute: END");
- }
-}
diff --git a/src/main/java/io/trygvis/UpdateArticeCallable.java b/src/main/java/io/trygvis/UpdateArticeCallable.java
new file mode 100755
index 0000000..d022655
--- /dev/null
+++ b/src/main/java/io/trygvis/UpdateArticeCallable.java
@@ -0,0 +1,43 @@
+package io.trygvis;
+
+import io.trygvis.model.*;
+import io.trygvis.queue.*;
+import org.slf4j.*;
+import org.springframework.stereotype.*;
+import org.springframework.transaction.annotation.*;
+
+import java.util.*;
+import javax.persistence.*;
+
+import static org.springframework.transaction.annotation.Propagation.*;
+
+@Component
+public class UpdateArticeCallable implements AsyncService.AsyncCallable {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final Random r = new Random();
+
+ @PersistenceContext
+ private EntityManager entityManager;
+
+// @Transactional(propagation = REQUIRES_NEW)
+ public void run() throws Exception {
+ log.info("UpdateArticeJob.run: BEGIN");
+
+ Date now = new Date();
+
+ log.info("now = {}", now);
+
+ TypedQuery<Article> q = entityManager.createQuery(entityManager.getCriteriaBuilder().createQuery(Article.class));
+
+ List<Article> list = q.getResultList();
+ log.info("Got {} articles", list.size());
+
+ Article a = list.get(r.nextInt(list.size()));
+ a.setUpdated(new Date());
+
+ entityManager.persist(a);
+
+ log.info("UpdateArticeJob.run: END");
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/AsyncService.java b/src/main/java/io/trygvis/queue/AsyncService.java
index dcbe991..de0a1af 100755
--- a/src/main/java/io/trygvis/queue/AsyncService.java
+++ b/src/main/java/io/trygvis/queue/AsyncService.java
@@ -4,7 +4,7 @@ import org.quartz.*;
public interface AsyncService<QueueRef extends AsyncService.QueueRef, ExecutionRef extends AsyncService.ExecutionRef> {
- void registerQueue(String name, int interval, Class klass) throws SchedulerException;
+ void registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException;
QueueRef getQueue(String name);
@@ -15,4 +15,8 @@ public interface AsyncService<QueueRef extends AsyncService.QueueRef, ExecutionR
interface ExecutionRef {
}
+
+ interface AsyncCallable {
+ void run() throws Exception;
+ }
}
diff --git a/src/main/java/io/trygvis/queue/JpaAsyncService.java b/src/main/java/io/trygvis/queue/JpaAsyncService.java
index 2d6c2df..e715c6d 100755
--- a/src/main/java/io/trygvis/queue/JpaAsyncService.java
+++ b/src/main/java/io/trygvis/queue/JpaAsyncService.java
@@ -10,36 +10,26 @@ import org.quartz.JobExecutionException;
import org.quartz.JobKey;
import org.quartz.JobPersistenceException;
import org.quartz.Scheduler;
-import org.quartz.SchedulerContext;
import org.quartz.SchedulerException;
import org.quartz.SchedulerFactory;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.SimpleTrigger;
import org.quartz.TriggerBuilder;
-import org.quartz.impl.DirectSchedulerFactory;
import org.quartz.impl.JobDetailImpl;
import org.quartz.impl.StdSchedulerFactory;
-import org.quartz.impl.jdbcjobstore.JobStoreSupport;
-import org.quartz.impl.jdbcjobstore.JobStoreTX;
-import org.quartz.impl.jdbcjobstore.PostgreSQLDelegate;
-import org.quartz.spi.JobStore;
+import org.quartz.impl.jdbcjobstore.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
-import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.jdbc.datasource.DataSourceUtils;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
-import org.springframework.transaction.TransactionDefinition;
-import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional;
-import org.springframework.transaction.interceptor.RuleBasedTransactionAttribute;
-import org.springframework.transaction.support.DefaultTransactionDefinition;
-import javax.annotation.PostConstruct;
+import javax.annotation.*;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.persistence.RollbackException;
@@ -47,14 +37,14 @@ import javax.persistence.TypedQuery;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
-import java.util.List;
-import java.util.Properties;
+import java.util.*;
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
@SuppressWarnings("SpringJavaAutowiringInspection")
@Component
-public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef, JpaAsyncService.JpaExecutionRef> {
+public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef, JpaAsyncService.JpaExecutionRef>,
+ ApplicationContextAware {
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -64,13 +54,9 @@ public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef
@Autowired
private DataSource dataSource;
- private static DataSource dataSourceStatic;
-
@Autowired
private PlatformTransactionManager transactionManager;
- private static PlatformTransactionManager transactionManagerStatic;
-
@Autowired
private QueueRepository queueRepository;
@@ -79,8 +65,23 @@ public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef
private Scheduler scheduler;
+ private static DataSource dataSourceStatic;
+
+ private static PlatformTransactionManager transactionManagerStatic;
+
+ private static class Context {
+ ApplicationContext applicationContext;
+ List<AsyncCallable> callables = new ArrayList<>();
+ }
+
+ private final Context context = new Context();
+
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+ this.context.applicationContext = applicationContext;
+ }
+
@PostConstruct
- public void afterPropertiesSet() throws Exception {
+ public void postConstruct() throws Exception {
transactionManagerStatic = transactionManager;
dataSourceStatic = dataSource;
log.info("afterPropertiesSet!!");
@@ -91,18 +92,24 @@ public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef
quartzProperties.setProperty(StdSchedulerFactory.PROP_SCHED_SKIP_UPDATE_CHECK, "true");
// quartzProperties.setProperty(StdSchedulerFactory.PROP_DATASOURCE_PREFIX, "wat");
quartzProperties.setProperty(StdSchedulerFactory.PROP_JOB_STORE_CLASS, JpaDataSourceJobStore.class.getName());
+ quartzProperties.setProperty(StdSchedulerFactory.PROP_JOB_STORE_LOCK_HANDLER_CLASS, SimpleSemaphore.class.getName());
quartzProperties.setProperty("org.quartz.threadPool.threadCount", "10");
quartzProperties.setProperty("org.quartz.jobStore.driverDelegateClass", PostgreSQLDelegate.class.getName());
quartzProperties.setProperty("org.quartz.scheduler.jmx.export", "true");
SchedulerFactory schedulerFactory = new StdSchedulerFactory(quartzProperties);
- Scheduler s = schedulerFactory.getScheduler();
- System.out.println("s.getSchedulerName() = " + s.getSchedulerName());
- scheduler = schedulerFactory.getScheduler("queue");
+ scheduler = schedulerFactory.getScheduler();
+ scheduler.getContext().put("context", context);
+ scheduler.start();
+ }
+
+ @PreDestroy
+ public void preDestroy() throws Exception {
+ scheduler.shutdown();
}
@Transactional
- public void registerQueue(String name, int interval, Class klass) throws SchedulerException {
+ public synchronized void registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException {
Queue q = queueRepository.findByName(name);
@@ -121,16 +128,16 @@ public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef
}
}
+ context.callables.add(callable);
+ int index = context.callables.size() - 1;
+
JobDetailImpl jobDetail = new JobDetailImpl();
JobKey jobKey = JobKey.jobKey("queue-" + name);
jobDetail.setKey(jobKey);
jobDetail.setJobClass(AsyncServiceJob.class);
jobDetail.setDurability(true);
- JobDataMap map = new JobDataMap();
- map.put("class", klass.getName());
- jobDetail.setJobDataMap(map);
-
+ jobDetail.getJobDataMap().put("index", Integer.toString(index));
scheduler.addJob(jobDetail, true);
SimpleScheduleBuilder schedule = simpleSchedule().
@@ -143,6 +150,10 @@ public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef
forJob(jobKey).
build();
+ if(scheduler.checkExists(trigger.getKey())) {
+ scheduler.unscheduleJob(trigger.getKey());
+ }
+
scheduler.scheduleJob(trigger);
}
@@ -183,26 +194,25 @@ public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef
public static class AsyncServiceJob implements Job {
private final Logger log = LoggerFactory.getLogger(getClass());
- public void execute(JobExecutionContext context) throws JobExecutionException {
+ public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
try {
log.info("Running");
- SchedulerContext map = context.getScheduler().getContext();
- ApplicationContext applicationContext = (ApplicationContext) map.get("applicationContext");
-
- log.info("applicationContext = {}", applicationContext);
+ Context context = (Context) jobExecutionContext.getScheduler().getContext().get("context");
- String className = map.getString("class");
+ for (Map.Entry<String, Object> entry : jobExecutionContext.getMergedJobDataMap().entrySet()) {
+ log.info("{} = {}, class= {}", entry.getKey(), entry.getValue(), entry.getValue() != null ? entry.getValue().getClass() : "<null>");
+ }
- log.info("className = {}", className);
+ int index = jobExecutionContext.getMergedJobDataMap().getIntFromString("index");
+ AsyncCallable callable = context.callables.get(index);
- Class klass = getClass().getClassLoader().loadClass(className);
- Object bean = applicationContext.getBean(klass);
+ log.info("Calling {}", callable);
+ callable.run();
- log.info("bean = {}", bean);
- } catch (Exception e) {
+ } catch (Throwable e) {
log.warn("fail", e);
- throw new JobExecutionException(e, false);
+// throw new JobExecutionException(e, false);
}
}
}
@@ -218,14 +228,14 @@ public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef
// definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
// TransactionStatus transaction = transactionManagerStatic.getTransaction(definition);
- System.out.println("dataSourceStatic = " + dataSourceStatic);
+// System.out.println("dataSourceStatic = " + dataSourceStatic);
Connection c = DataSourceUtils.getConnection(dataSourceStatic);
try {
c.setAutoCommit(false);
} catch (SQLException e) {
throw new RuntimeException(e);
}
- System.out.println("c = " + c);
+// System.out.println("c = " + c);
return c;
}
diff --git a/src/main/java/io/trygvis/spring/Config.java b/src/main/java/io/trygvis/spring/Config.java
index 9a968e0..6bc8960 100755
--- a/src/main/java/io/trygvis/spring/Config.java
+++ b/src/main/java/io/trygvis/spring/Config.java
@@ -1,15 +1,11 @@
package io.trygvis.spring;
import com.jolbox.bonecp.*;
-import io.trygvis.MyJob;
import io.trygvis.model.*;
import org.hibernate.*;
import org.hibernate.annotations.*;
import org.hibernate.cfg.*;
import org.hibernate.ejb.*;
-import org.quartz.*;
-import org.quartz.impl.jdbcjobstore.*;
-import org.quartz.spi.*;
import org.springframework.beans.factory.annotation.*;
import org.springframework.context.annotation.*;
import org.springframework.context.annotation.Configuration;
@@ -18,7 +14,6 @@ import org.springframework.data.jpa.repository.config.*;
import org.springframework.jdbc.datasource.*;
import org.springframework.orm.hibernate4.*;
import org.springframework.orm.jpa.*;
-import org.springframework.scheduling.quartz.*;
import org.springframework.transaction.*;
import org.springframework.transaction.annotation.*;
import org.springframework.transaction.support.*;
diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml
index 038a2f1..5fa174f 100755
--- a/src/main/resources/logback.xml
+++ b/src/main/resources/logback.xml
@@ -9,6 +9,7 @@
<logger name="org.springframework.jdbc.core.JdbcTemplate" level="DEBUG"/>
<logger name="org.hibernate" level="DEBUG"/>
<logger name="org.quartz" level="DEBUG"/>
+ <logger name="org.quartz.impl.jdbcjobstore.SimpleSemaphore" level="INFO"/>
<!--
<logger name="org" level="INFO"/>
<logger name="net" level="INFO"/>