diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2013-04-11 19:12:27 +0200 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2013-04-11 19:12:27 +0200 |
commit | b59f08bb5b08be7972086037ec54b23ea9fb49f8 (patch) | |
tree | ef168ed7da2aa9fe8766a0d1625aa40e36a8ad99 /src/main | |
parent | cd99d57cccb88ea8a058eca530d62a81a665983c (diff) | |
download | quartz-based-queue-b59f08bb5b08be7972086037ec54b23ea9fb49f8.tar.gz quartz-based-queue-b59f08bb5b08be7972086037ec54b23ea9fb49f8.tar.bz2 quartz-based-queue-b59f08bb5b08be7972086037ec54b23ea9fb49f8.tar.xz quartz-based-queue-b59f08bb5b08be7972086037ec54b23ea9fb49f8.zip |
wip
Diffstat (limited to 'src/main')
-rwxr-xr-x | src/main/java/io/trygvis/CreateArticleCallable.java | 33 | ||||
-rwxr-xr-x | src/main/java/io/trygvis/Main.java | 12 | ||||
-rwxr-xr-x | src/main/java/io/trygvis/MyJob.java | 38 | ||||
-rwxr-xr-x | src/main/java/io/trygvis/UpdateArticeCallable.java | 43 | ||||
-rwxr-xr-x | src/main/java/io/trygvis/queue/AsyncService.java | 6 | ||||
-rwxr-xr-x | src/main/java/io/trygvis/queue/JpaAsyncService.java | 94 | ||||
-rwxr-xr-x | src/main/java/io/trygvis/spring/Config.java | 5 | ||||
-rwxr-xr-x | src/main/resources/logback.xml | 1 |
8 files changed, 143 insertions, 89 deletions
diff --git a/src/main/java/io/trygvis/CreateArticleCallable.java b/src/main/java/io/trygvis/CreateArticleCallable.java new file mode 100755 index 0000000..e727b46 --- /dev/null +++ b/src/main/java/io/trygvis/CreateArticleCallable.java @@ -0,0 +1,33 @@ +package io.trygvis; + +import io.trygvis.model.*; +import io.trygvis.queue.*; +import org.slf4j.*; +import org.springframework.stereotype.*; +import org.springframework.transaction.annotation.*; + +import java.util.*; +import javax.persistence.*; + +import static org.springframework.transaction.annotation.Propagation.*; + +@Component +public class CreateArticleCallable implements AsyncService.AsyncCallable { + private final Logger log = LoggerFactory.getLogger(getClass()); + + @PersistenceContext + private EntityManager entityManager; + +// @Transactional(propagation = REQUIRES_NEW) + public void run() throws Exception { + log.info("CreateArticeJob.run: BEGIN"); + Date now = new Date(); + + log.info("now = {}", now); + + Article article = new Article(new Date(), null, "title", "body"); + entityManager.persist(article); + + log.info("CreateArticeJob.run: END"); + } +} diff --git a/src/main/java/io/trygvis/Main.java b/src/main/java/io/trygvis/Main.java index 19167d7..aafb728 100755 --- a/src/main/java/io/trygvis/Main.java +++ b/src/main/java/io/trygvis/Main.java @@ -29,7 +29,6 @@ public class Main { log.info("Starting context");
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
- context.start();
log.info("Started context");
try {
@@ -50,12 +49,19 @@ public class Main { @Autowired
private AsyncService<AsyncService.QueueRef, AsyncService.ExecutionRef> asyncService;
+ @Autowired
+ private CreateArticleCallable createArticleCallable;
+
+ @Autowired
+ private UpdateArticeCallable updateArticeCallable;
+
public void run() throws Exception {
log.info("Main.run");
- asyncService.registerQueue("test-queue", 1, MyJob.class);
+ asyncService.registerQueue("create-queue", 1, createArticleCallable);
+// asyncService.registerQueue("update-queue", 1, updateArticeCallable);
- AsyncService.QueueRef queue = asyncService.getQueue("test-queue");
+ AsyncService.QueueRef queue = asyncService.getQueue("create-queue");
AsyncService.ExecutionRef executionRef = asyncService.schedule(queue);
}
diff --git a/src/main/java/io/trygvis/MyJob.java b/src/main/java/io/trygvis/MyJob.java deleted file mode 100755 index 7303a33..0000000 --- a/src/main/java/io/trygvis/MyJob.java +++ /dev/null @@ -1,38 +0,0 @@ -package io.trygvis;
-
-import io.trygvis.model.*;
-import org.quartz.*;
-import org.slf4j.*;
-
-import java.util.*;
-import javax.persistence.*;
-
-public class MyJob implements Job {
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- @PersistenceContext
- private EntityManager entityManager;
-
- public void execute(JobExecutionContext context) throws JobExecutionException {
- log.info("MyJob.execute: BEGIN");
- log.info("entityManager = {}", entityManager);
- log.info("context.getJobDetail().getKey() = {}", context.getJobDetail().getKey());
-
-/*
- Date now = new Date();
-
- log.info("now = {}", now);
-
-// Article article = entityManager.find(Article.class, 1);
-//
-// System.out.println("article = " + article);
-// article.setUpdated(now);
-// entityManager.persist(article);
-
- Article article = new Article(new Date(), null, "title", "body");
- entityManager.persist(article);
-*/
-
- log.info("MyJob.execute: END");
- }
-}
diff --git a/src/main/java/io/trygvis/UpdateArticeCallable.java b/src/main/java/io/trygvis/UpdateArticeCallable.java new file mode 100755 index 0000000..d022655 --- /dev/null +++ b/src/main/java/io/trygvis/UpdateArticeCallable.java @@ -0,0 +1,43 @@ +package io.trygvis; + +import io.trygvis.model.*; +import io.trygvis.queue.*; +import org.slf4j.*; +import org.springframework.stereotype.*; +import org.springframework.transaction.annotation.*; + +import java.util.*; +import javax.persistence.*; + +import static org.springframework.transaction.annotation.Propagation.*; + +@Component +public class UpdateArticeCallable implements AsyncService.AsyncCallable { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final Random r = new Random(); + + @PersistenceContext + private EntityManager entityManager; + +// @Transactional(propagation = REQUIRES_NEW) + public void run() throws Exception { + log.info("UpdateArticeJob.run: BEGIN"); + + Date now = new Date(); + + log.info("now = {}", now); + + TypedQuery<Article> q = entityManager.createQuery(entityManager.getCriteriaBuilder().createQuery(Article.class)); + + List<Article> list = q.getResultList(); + log.info("Got {} articles", list.size()); + + Article a = list.get(r.nextInt(list.size())); + a.setUpdated(new Date()); + + entityManager.persist(a); + + log.info("UpdateArticeJob.run: END"); + } +} diff --git a/src/main/java/io/trygvis/queue/AsyncService.java b/src/main/java/io/trygvis/queue/AsyncService.java index dcbe991..de0a1af 100755 --- a/src/main/java/io/trygvis/queue/AsyncService.java +++ b/src/main/java/io/trygvis/queue/AsyncService.java @@ -4,7 +4,7 @@ import org.quartz.*; public interface AsyncService<QueueRef extends AsyncService.QueueRef, ExecutionRef extends AsyncService.ExecutionRef> {
- void registerQueue(String name, int interval, Class klass) throws SchedulerException;
+ void registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException;
QueueRef getQueue(String name);
@@ -15,4 +15,8 @@ public interface AsyncService<QueueRef extends AsyncService.QueueRef, ExecutionR interface ExecutionRef {
}
+
+ interface AsyncCallable {
+ void run() throws Exception;
+ }
}
diff --git a/src/main/java/io/trygvis/queue/JpaAsyncService.java b/src/main/java/io/trygvis/queue/JpaAsyncService.java index 2d6c2df..e715c6d 100755 --- a/src/main/java/io/trygvis/queue/JpaAsyncService.java +++ b/src/main/java/io/trygvis/queue/JpaAsyncService.java @@ -10,36 +10,26 @@ import org.quartz.JobExecutionException; import org.quartz.JobKey;
import org.quartz.JobPersistenceException;
import org.quartz.Scheduler;
-import org.quartz.SchedulerContext;
import org.quartz.SchedulerException;
import org.quartz.SchedulerFactory;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.SimpleTrigger;
import org.quartz.TriggerBuilder;
-import org.quartz.impl.DirectSchedulerFactory;
import org.quartz.impl.JobDetailImpl;
import org.quartz.impl.StdSchedulerFactory;
-import org.quartz.impl.jdbcjobstore.JobStoreSupport;
-import org.quartz.impl.jdbcjobstore.JobStoreTX;
-import org.quartz.impl.jdbcjobstore.PostgreSQLDelegate;
-import org.quartz.spi.JobStore;
+import org.quartz.impl.jdbcjobstore.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
-import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.jdbc.datasource.DataSourceUtils;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
-import org.springframework.transaction.TransactionDefinition;
-import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional;
-import org.springframework.transaction.interceptor.RuleBasedTransactionAttribute;
-import org.springframework.transaction.support.DefaultTransactionDefinition;
-import javax.annotation.PostConstruct;
+import javax.annotation.*;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.persistence.RollbackException;
@@ -47,14 +37,14 @@ import javax.persistence.TypedQuery; import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
-import java.util.List;
-import java.util.Properties;
+import java.util.*;
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
@SuppressWarnings("SpringJavaAutowiringInspection")
@Component
-public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef, JpaAsyncService.JpaExecutionRef> {
+public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef, JpaAsyncService.JpaExecutionRef>,
+ ApplicationContextAware {
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -64,13 +54,9 @@ public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef @Autowired
private DataSource dataSource;
- private static DataSource dataSourceStatic;
-
@Autowired
private PlatformTransactionManager transactionManager;
- private static PlatformTransactionManager transactionManagerStatic;
-
@Autowired
private QueueRepository queueRepository;
@@ -79,8 +65,23 @@ public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef private Scheduler scheduler;
+ private static DataSource dataSourceStatic;
+
+ private static PlatformTransactionManager transactionManagerStatic;
+
+ private static class Context {
+ ApplicationContext applicationContext;
+ List<AsyncCallable> callables = new ArrayList<>();
+ }
+
+ private final Context context = new Context();
+
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+ this.context.applicationContext = applicationContext;
+ }
+
@PostConstruct
- public void afterPropertiesSet() throws Exception {
+ public void postConstruct() throws Exception {
transactionManagerStatic = transactionManager;
dataSourceStatic = dataSource;
log.info("afterPropertiesSet!!");
@@ -91,18 +92,24 @@ public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef quartzProperties.setProperty(StdSchedulerFactory.PROP_SCHED_SKIP_UPDATE_CHECK, "true");
// quartzProperties.setProperty(StdSchedulerFactory.PROP_DATASOURCE_PREFIX, "wat");
quartzProperties.setProperty(StdSchedulerFactory.PROP_JOB_STORE_CLASS, JpaDataSourceJobStore.class.getName());
+ quartzProperties.setProperty(StdSchedulerFactory.PROP_JOB_STORE_LOCK_HANDLER_CLASS, SimpleSemaphore.class.getName());
quartzProperties.setProperty("org.quartz.threadPool.threadCount", "10");
quartzProperties.setProperty("org.quartz.jobStore.driverDelegateClass", PostgreSQLDelegate.class.getName());
quartzProperties.setProperty("org.quartz.scheduler.jmx.export", "true");
SchedulerFactory schedulerFactory = new StdSchedulerFactory(quartzProperties);
- Scheduler s = schedulerFactory.getScheduler();
- System.out.println("s.getSchedulerName() = " + s.getSchedulerName());
- scheduler = schedulerFactory.getScheduler("queue");
+ scheduler = schedulerFactory.getScheduler();
+ scheduler.getContext().put("context", context);
+ scheduler.start();
+ }
+
+ @PreDestroy
+ public void preDestroy() throws Exception {
+ scheduler.shutdown();
}
@Transactional
- public void registerQueue(String name, int interval, Class klass) throws SchedulerException {
+ public synchronized void registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException {
Queue q = queueRepository.findByName(name);
@@ -121,16 +128,16 @@ public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef }
}
+ context.callables.add(callable);
+ int index = context.callables.size() - 1;
+
JobDetailImpl jobDetail = new JobDetailImpl();
JobKey jobKey = JobKey.jobKey("queue-" + name);
jobDetail.setKey(jobKey);
jobDetail.setJobClass(AsyncServiceJob.class);
jobDetail.setDurability(true);
- JobDataMap map = new JobDataMap();
- map.put("class", klass.getName());
- jobDetail.setJobDataMap(map);
-
+ jobDetail.getJobDataMap().put("index", Integer.toString(index));
scheduler.addJob(jobDetail, true);
SimpleScheduleBuilder schedule = simpleSchedule().
@@ -143,6 +150,10 @@ public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef forJob(jobKey).
build();
+ if(scheduler.checkExists(trigger.getKey())) {
+ scheduler.unscheduleJob(trigger.getKey());
+ }
+
scheduler.scheduleJob(trigger);
}
@@ -183,26 +194,25 @@ public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef public static class AsyncServiceJob implements Job {
private final Logger log = LoggerFactory.getLogger(getClass());
- public void execute(JobExecutionContext context) throws JobExecutionException {
+ public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
try {
log.info("Running");
- SchedulerContext map = context.getScheduler().getContext();
- ApplicationContext applicationContext = (ApplicationContext) map.get("applicationContext");
-
- log.info("applicationContext = {}", applicationContext);
+ Context context = (Context) jobExecutionContext.getScheduler().getContext().get("context");
- String className = map.getString("class");
+ for (Map.Entry<String, Object> entry : jobExecutionContext.getMergedJobDataMap().entrySet()) {
+ log.info("{} = {}, class= {}", entry.getKey(), entry.getValue(), entry.getValue() != null ? entry.getValue().getClass() : "<null>");
+ }
- log.info("className = {}", className);
+ int index = jobExecutionContext.getMergedJobDataMap().getIntFromString("index");
+ AsyncCallable callable = context.callables.get(index);
- Class klass = getClass().getClassLoader().loadClass(className);
- Object bean = applicationContext.getBean(klass);
+ log.info("Calling {}", callable);
+ callable.run();
- log.info("bean = {}", bean);
- } catch (Exception e) {
+ } catch (Throwable e) {
log.warn("fail", e);
- throw new JobExecutionException(e, false);
+// throw new JobExecutionException(e, false);
}
}
}
@@ -218,14 +228,14 @@ public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef // definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
// TransactionStatus transaction = transactionManagerStatic.getTransaction(definition);
- System.out.println("dataSourceStatic = " + dataSourceStatic);
+// System.out.println("dataSourceStatic = " + dataSourceStatic);
Connection c = DataSourceUtils.getConnection(dataSourceStatic);
try {
c.setAutoCommit(false);
} catch (SQLException e) {
throw new RuntimeException(e);
}
- System.out.println("c = " + c);
+// System.out.println("c = " + c);
return c;
}
diff --git a/src/main/java/io/trygvis/spring/Config.java b/src/main/java/io/trygvis/spring/Config.java index 9a968e0..6bc8960 100755 --- a/src/main/java/io/trygvis/spring/Config.java +++ b/src/main/java/io/trygvis/spring/Config.java @@ -1,15 +1,11 @@ package io.trygvis.spring;
import com.jolbox.bonecp.*;
-import io.trygvis.MyJob;
import io.trygvis.model.*;
import org.hibernate.*;
import org.hibernate.annotations.*;
import org.hibernate.cfg.*;
import org.hibernate.ejb.*;
-import org.quartz.*;
-import org.quartz.impl.jdbcjobstore.*;
-import org.quartz.spi.*;
import org.springframework.beans.factory.annotation.*;
import org.springframework.context.annotation.*;
import org.springframework.context.annotation.Configuration;
@@ -18,7 +14,6 @@ import org.springframework.data.jpa.repository.config.*; import org.springframework.jdbc.datasource.*;
import org.springframework.orm.hibernate4.*;
import org.springframework.orm.jpa.*;
-import org.springframework.scheduling.quartz.*;
import org.springframework.transaction.*;
import org.springframework.transaction.annotation.*;
import org.springframework.transaction.support.*;
diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml index 038a2f1..5fa174f 100755 --- a/src/main/resources/logback.xml +++ b/src/main/resources/logback.xml @@ -9,6 +9,7 @@ <logger name="org.springframework.jdbc.core.JdbcTemplate" level="DEBUG"/>
<logger name="org.hibernate" level="DEBUG"/>
<logger name="org.quartz" level="DEBUG"/>
+ <logger name="org.quartz.impl.jdbcjobstore.SimpleSemaphore" level="INFO"/>
<!--
<logger name="org" level="INFO"/>
<logger name="net" level="INFO"/>
|