From 8477e8a888d584cf1352a4b69d7cefb2b7cd9ace Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sun, 14 Apr 2013 18:04:50 +0200 Subject: o Dropping quartz, using straight executor service. --- .../java/io/trygvis/CreateArticleCallable.java | 13 +- src/main/java/io/trygvis/Main.java | 21 +- src/main/java/io/trygvis/UpdateArticeCallable.java | 43 --- .../java/io/trygvis/UpdateArticleCallable.java | 44 +++ src/main/java/io/trygvis/data/TaskRepository.java | 4 + src/main/java/io/trygvis/model/Queue.java | 12 + src/main/java/io/trygvis/model/Task.java | 74 ++++- src/main/java/io/trygvis/queue/AsyncService.java | 4 +- .../java/io/trygvis/queue/JpaAsyncService.java | 306 ++++++++++----------- src/main/java/io/trygvis/spring/Config.java | 2 +- src/main/resources/logback.xml | 22 +- 11 files changed, 301 insertions(+), 244 deletions(-) delete mode 100755 src/main/java/io/trygvis/UpdateArticeCallable.java create mode 100755 src/main/java/io/trygvis/UpdateArticleCallable.java diff --git a/src/main/java/io/trygvis/CreateArticleCallable.java b/src/main/java/io/trygvis/CreateArticleCallable.java index e727b46..85734fc 100755 --- a/src/main/java/io/trygvis/CreateArticleCallable.java +++ b/src/main/java/io/trygvis/CreateArticleCallable.java @@ -6,21 +6,28 @@ import org.slf4j.*; import org.springframework.stereotype.*; import org.springframework.transaction.annotation.*; -import java.util.*; import javax.persistence.*; +import java.util.*; import static org.springframework.transaction.annotation.Propagation.*; -@Component +@Component("createArticle") +@Transactional(propagation = MANDATORY) public class CreateArticleCallable implements AsyncService.AsyncCallable { private final Logger log = LoggerFactory.getLogger(getClass()); @PersistenceContext private EntityManager entityManager; -// @Transactional(propagation = REQUIRES_NEW) + private Random random = new Random(); + public void run() throws Exception { log.info("CreateArticeJob.run: BEGIN"); + + if (random.nextBoolean()) { + throw new RuntimeException("failing create article"); + } + Date now = new Date(); log.info("now = {}", now); diff --git a/src/main/java/io/trygvis/Main.java b/src/main/java/io/trygvis/Main.java index aafb728..448c3e4 100755 --- a/src/main/java/io/trygvis/Main.java +++ b/src/main/java/io/trygvis/Main.java @@ -7,12 +7,12 @@ import org.slf4j.bridge.*; import org.springframework.beans.factory.annotation.*; import org.springframework.context.support.*; import org.springframework.stereotype.*; -import org.springframework.transaction.annotation.*; + +import java.util.*; import static java.lang.System.*; @Component -@Transactional public class Main { private static final Logger log = LoggerFactory.getLogger(Main.class); @@ -23,7 +23,7 @@ public class Main { setProperty("database.url", getProperty("jdbc.url", "jdbc:postgresql://localhost/" + username)); setProperty("database.username", username); setProperty("database.password", username); - setProperty("hibernate.showSql", "true"); +// setProperty("hibernate.showSql", "true"); setProperty("hibernate.hbm2ddl.auto", "create"); // create setProperty("hibernate.dialect", PostgreSQL82Dialect.class.getName()); @@ -50,19 +50,26 @@ public class Main { private AsyncService asyncService; @Autowired - private CreateArticleCallable createArticleCallable; + @Qualifier("createArticle") + private AsyncService.AsyncCallable createArticleCallable; @Autowired - private UpdateArticeCallable updateArticeCallable; + @Qualifier("createArticle") + private AsyncService.AsyncCallable/*UpdateArticleCallable*/ updateArticleCallable; public void run() throws Exception { log.info("Main.run"); - asyncService.registerQueue("create-queue", 1, createArticleCallable); + JpaAsyncService.JpaQueueRef queueRef = asyncService.registerQueue("create-queue", 10, createArticleCallable); + log.info("queue registered: ref = {}", queueRef); // asyncService.registerQueue("update-queue", 1, updateArticeCallable); AsyncService.QueueRef queue = asyncService.getQueue("create-queue"); - AsyncService.ExecutionRef executionRef = asyncService.schedule(queue); + List refs = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + refs.add(asyncService.schedule(queue)); + } } } diff --git a/src/main/java/io/trygvis/UpdateArticeCallable.java b/src/main/java/io/trygvis/UpdateArticeCallable.java deleted file mode 100755 index d022655..0000000 --- a/src/main/java/io/trygvis/UpdateArticeCallable.java +++ /dev/null @@ -1,43 +0,0 @@ -package io.trygvis; - -import io.trygvis.model.*; -import io.trygvis.queue.*; -import org.slf4j.*; -import org.springframework.stereotype.*; -import org.springframework.transaction.annotation.*; - -import java.util.*; -import javax.persistence.*; - -import static org.springframework.transaction.annotation.Propagation.*; - -@Component -public class UpdateArticeCallable implements AsyncService.AsyncCallable { - private final Logger log = LoggerFactory.getLogger(getClass()); - - private final Random r = new Random(); - - @PersistenceContext - private EntityManager entityManager; - -// @Transactional(propagation = REQUIRES_NEW) - public void run() throws Exception { - log.info("UpdateArticeJob.run: BEGIN"); - - Date now = new Date(); - - log.info("now = {}", now); - - TypedQuery
q = entityManager.createQuery(entityManager.getCriteriaBuilder().createQuery(Article.class)); - - List
list = q.getResultList(); - log.info("Got {} articles", list.size()); - - Article a = list.get(r.nextInt(list.size())); - a.setUpdated(new Date()); - - entityManager.persist(a); - - log.info("UpdateArticeJob.run: END"); - } -} diff --git a/src/main/java/io/trygvis/UpdateArticleCallable.java b/src/main/java/io/trygvis/UpdateArticleCallable.java new file mode 100755 index 0000000..7ed8b63 --- /dev/null +++ b/src/main/java/io/trygvis/UpdateArticleCallable.java @@ -0,0 +1,44 @@ +package io.trygvis; + +import io.trygvis.model.*; +import io.trygvis.queue.*; +import org.slf4j.*; +import org.springframework.stereotype.*; +import org.springframework.transaction.annotation.*; + +import java.util.*; +import javax.persistence.*; + +import static org.springframework.transaction.annotation.Propagation.*; + +@Component +public class UpdateArticleCallable + implements AsyncService.AsyncCallable { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final Random r = new Random(); + + @PersistenceContext + private EntityManager entityManager; + +// @Transactional(propagation = REQUIRES_NEW) + public void run() throws Exception { + log.info("UpdateArticeJob.run: BEGIN"); + + Date now = new Date(); + + log.info("now = {}", now); + + TypedQuery
q = entityManager.createQuery(entityManager.getCriteriaBuilder().createQuery(Article.class)); + + List
list = q.getResultList(); + log.info("Got {} articles", list.size()); + + Article a = list.get(r.nextInt(list.size())); + a.setUpdated(new Date()); + + entityManager.persist(a); + + log.info("UpdateArticeJob.run: END"); + } +} diff --git a/src/main/java/io/trygvis/data/TaskRepository.java b/src/main/java/io/trygvis/data/TaskRepository.java index 0b65199..e24d520 100755 --- a/src/main/java/io/trygvis/data/TaskRepository.java +++ b/src/main/java/io/trygvis/data/TaskRepository.java @@ -1,7 +1,11 @@ package io.trygvis.data; import io.trygvis.model.*; +import io.trygvis.model.Queue; import org.springframework.data.jpa.repository.*; +import java.util.*; + public interface TaskRepository extends JpaRepository { + List findByQueueAndCompletedIsNull(Queue queue); } diff --git a/src/main/java/io/trygvis/model/Queue.java b/src/main/java/io/trygvis/model/Queue.java index 52c5c0f..aeec405 100755 --- a/src/main/java/io/trygvis/model/Queue.java +++ b/src/main/java/io/trygvis/model/Queue.java @@ -19,6 +19,10 @@ public class Queue { private long interval; + @SuppressWarnings("UnusedDeclaration") + private Queue() { + } + public Queue(String name, long interval) { this.name = name; this.interval = interval; @@ -43,4 +47,12 @@ public class Queue { public void setInterval(long interval) { this.interval = interval; } + + public String toString() { + return "Queue{" + + "id=" + id + + ", name='" + name + '\'' + + ", interval=" + interval + + '}'; + } } diff --git a/src/main/java/io/trygvis/model/Task.java b/src/main/java/io/trygvis/model/Task.java index fa44e26..e86b623 100755 --- a/src/main/java/io/trygvis/model/Task.java +++ b/src/main/java/io/trygvis/model/Task.java @@ -1,6 +1,8 @@ package io.trygvis.model; import javax.persistence.*; +import java.util.*; +import java.util.regex.*; @Entity public class Task { @@ -12,21 +14,81 @@ public class Task { @ManyToOne private Queue queue; - private Long long1; + private Date scheduled; - public Task(Queue queue) { + private Date lastRun; + + private int runCount; + + private Date completed; + + private String arguments; + + private static final Pattern pattern = Pattern.compile(" "); + + @SuppressWarnings("UnusedDeclaration") + private Task() { + } + + public Task(Queue queue, Date scheduled, String... arguments) { this.queue = queue; + this.scheduled = scheduled; + + StringBuilder builder = new StringBuilder(arguments.length * 100); + for (String argument : arguments) { + if (pattern.matcher(argument).matches()) { + throw new RuntimeException("Bad argument: '" + argument + "'."); + } + builder.append(argument).append(' '); + } + this.arguments = builder.toString(); } public Integer getId() { return id; } - public Long getLong1() { - return long1; + public String[] getArguments() { + return arguments.split(" "); + } + + public Date getScheduled() { + return scheduled; + } + + public Date getLastRun() { + return lastRun; + } + + public int getRunCount() { + return runCount; + } + + public Date getCompleted() { + return completed; + } + + public void setCompleted(Date completed) { + this.completed = completed; + } + + public void registerRun() { + lastRun = new Date(); + runCount++; + } + + public void registerComplete(Date completed) { + this.completed = completed; } - public void setLong1(Long long1) { - this.long1 = long1; + public String toString() { + return "Task{" + + "id=" + id + + ", queue=" + queue + + ", scheduled=" + scheduled + + ", lastRun=" + lastRun + + ", completed=" + completed + + ", arguments='" + arguments + '\'' + + '}'; } } diff --git a/src/main/java/io/trygvis/queue/AsyncService.java b/src/main/java/io/trygvis/queue/AsyncService.java index de0a1af..f792f5e 100755 --- a/src/main/java/io/trygvis/queue/AsyncService.java +++ b/src/main/java/io/trygvis/queue/AsyncService.java @@ -4,11 +4,11 @@ import org.quartz.*; public interface AsyncService { - void registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException; + JpaAsyncService.JpaQueueRef registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException; QueueRef getQueue(String name); - ExecutionRef schedule(QueueRef queue); + ExecutionRef schedule(QueueRef queue, String... args); interface QueueRef { } diff --git a/src/main/java/io/trygvis/queue/JpaAsyncService.java b/src/main/java/io/trygvis/queue/JpaAsyncService.java index e715c6d..95d5ef3 100755 --- a/src/main/java/io/trygvis/queue/JpaAsyncService.java +++ b/src/main/java/io/trygvis/queue/JpaAsyncService.java @@ -1,61 +1,36 @@ package io.trygvis.queue; -import io.trygvis.data.QueueRepository; -import io.trygvis.data.TaskRepository; +import io.trygvis.data.*; import io.trygvis.model.Queue; -import org.quartz.Job; -import org.quartz.JobDataMap; -import org.quartz.JobExecutionContext; -import org.quartz.JobExecutionException; -import org.quartz.JobKey; -import org.quartz.JobPersistenceException; -import org.quartz.Scheduler; -import org.quartz.SchedulerException; -import org.quartz.SchedulerFactory; -import org.quartz.SimpleScheduleBuilder; -import org.quartz.SimpleTrigger; -import org.quartz.TriggerBuilder; -import org.quartz.impl.JobDetailImpl; -import org.quartz.impl.StdSchedulerFactory; -import org.quartz.impl.jdbcjobstore.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.BeansException; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.ApplicationContext; -import org.springframework.context.ApplicationContextAware; -import org.springframework.jdbc.datasource.DataSourceUtils; -import org.springframework.stereotype.Component; -import org.springframework.transaction.PlatformTransactionManager; -import org.springframework.transaction.annotation.Transactional; - -import javax.annotation.*; -import javax.persistence.EntityManager; -import javax.persistence.PersistenceContext; -import javax.persistence.RollbackException; -import javax.persistence.TypedQuery; -import javax.sql.DataSource; -import java.sql.Connection; -import java.sql.SQLException; +import io.trygvis.model.*; +import org.quartz.*; +import org.slf4j.*; +import org.springframework.beans.factory.annotation.*; +import org.springframework.stereotype.*; +import org.springframework.transaction.*; +import org.springframework.transaction.annotation.*; +import org.springframework.transaction.support.*; + +import javax.persistence.*; import java.util.*; +import java.util.concurrent.*; -import static org.quartz.SimpleScheduleBuilder.simpleSchedule; +import static java.util.concurrent.TimeUnit.*; +import static org.springframework.transaction.support.TransactionSynchronizationManager.*; @SuppressWarnings("SpringJavaAutowiringInspection") @Component -public class JpaAsyncService implements AsyncService, - ApplicationContextAware { +public class JpaAsyncService implements AsyncService/*, ApplicationContextAware*/ { private final Logger log = LoggerFactory.getLogger(getClass()); + private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); + @PersistenceContext private EntityManager entityManager; @Autowired - private DataSource dataSource; - - @Autowired - private PlatformTransactionManager transactionManager; + private TransactionTemplate transactionTemplate; @Autowired private QueueRepository queueRepository; @@ -63,59 +38,17 @@ public class JpaAsyncService implements AsyncService callables = new ArrayList<>(); - } - - private final Context context = new Context(); - - public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { - this.context.applicationContext = applicationContext; - } - - @PostConstruct - public void postConstruct() throws Exception { - transactionManagerStatic = transactionManager; - dataSourceStatic = dataSource; - log.info("afterPropertiesSet!!"); - Properties quartzProperties = new Properties(); - quartzProperties.setProperty(StdSchedulerFactory.PROP_JOB_STORE_CLASS, JobStoreTX.class.getName()); - quartzProperties.setProperty(StdSchedulerFactory.PROP_SCHED_INSTANCE_NAME, "queue"); - quartzProperties.setProperty(StdSchedulerFactory.PROP_SCHED_SKIP_UPDATE_CHECK, "true"); - quartzProperties.setProperty(StdSchedulerFactory.PROP_SCHED_SKIP_UPDATE_CHECK, "true"); -// quartzProperties.setProperty(StdSchedulerFactory.PROP_DATASOURCE_PREFIX, "wat"); - quartzProperties.setProperty(StdSchedulerFactory.PROP_JOB_STORE_CLASS, JpaDataSourceJobStore.class.getName()); - quartzProperties.setProperty(StdSchedulerFactory.PROP_JOB_STORE_LOCK_HANDLER_CLASS, SimpleSemaphore.class.getName()); - quartzProperties.setProperty("org.quartz.threadPool.threadCount", "10"); - quartzProperties.setProperty("org.quartz.jobStore.driverDelegateClass", PostgreSQLDelegate.class.getName()); - quartzProperties.setProperty("org.quartz.scheduler.jmx.export", "true"); - SchedulerFactory schedulerFactory = new StdSchedulerFactory(quartzProperties); - - scheduler = schedulerFactory.getScheduler(); - scheduler.getContext().put("context", context); - scheduler.start(); - } - - @PreDestroy - public void preDestroy() throws Exception { - scheduler.shutdown(); - } - @Transactional - public synchronized void registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException { + public synchronized JpaQueueRef registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException { + log.info("registerQueue: ENTER"); Queue q = queueRepository.findByName(name); + log.info("q = {}", q); + if (q == null) { - Queue queue = new Queue(name, interval); - queueRepository.save(queue); + q = new Queue(name, interval); + q = queueRepository.save(q); } else { boolean dirty = false; if (interval != q.getInterval()) { @@ -124,133 +57,170 @@ public class JpaAsyncService implements AsyncService query = entityManager.createQuery("select q from io.trygvis.model.Queue q where name = ?1", Queue.class); - query.setParameter(1, name); - List list = query.getResultList(); - System.out.println("list.size() = " + list.size()); + Queue queue = queueRepository.findByName(name); - if (list.size() == 0) { + if (queue == null) { throw new RollbackException("No such queue: '" + name + "'."); } - Queue queue = list.get(0); - - entityManager.detach(query); + entityManager.detach(queue); return new JpaQueueRef(queue); } @Transactional - public JpaExecutionRef schedule(JpaQueueRef queue) { - return null; + public JpaExecutionRef schedule(JpaQueueRef queue, String... args) { + log.info("schedule: ENTER"); + Date scheduled = new Date(); + Task task = new Task(queue.queue, scheduled, args); + log.info("task = {}", task); + taskRepository.save(task); + log.info("task = {}", task); +// entityManager.detach(task); + log.info("schedule: LEAVE"); + return new JpaExecutionRef(task); } - static class JpaQueueRef implements AsyncService.QueueRef { + public static class JpaQueueRef implements AsyncService.QueueRef { public final Queue queue; JpaQueueRef(Queue queue) { this.queue = queue; } - } - static class JpaExecutionRef implements AsyncService.ExecutionRef { + public String toString() { + return "JpaQueueRef{" + + "queue=" + queue + + '}'; + } } - public static class AsyncServiceJob implements Job { - private final Logger log = LoggerFactory.getLogger(getClass()); + public static class JpaExecutionRef implements AsyncService.ExecutionRef { + private final Task task; - public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { - try { - log.info("Running"); + public JpaExecutionRef(Task task) { + this.task = task; + } - Context context = (Context) jobExecutionContext.getScheduler().getContext().get("context"); + public String toString() { + return "JpaExecutionRef{" + + "task=" + task + + '}'; + } + } - for (Map.Entry entry : jobExecutionContext.getMergedJobDataMap().entrySet()) { - log.info("{} = {}, class= {}", entry.getKey(), entry.getValue(), entry.getValue() != null ? entry.getValue().getClass() : ""); - } + private class CheckTimerTask implements Runnable { + private final AsyncCallable callable; + private final JpaQueueRef queueRef; + + private CheckTimerTask(AsyncCallable callable, JpaQueueRef queueRef) { + this.callable = callable; + this.queueRef = queueRef; + } + + public void run() { + log.info("JpaAsyncService$CheckTimerTask.run"); - int index = jobExecutionContext.getMergedJobDataMap().getIntFromString("index"); - AsyncCallable callable = context.callables.get(index); + List tasks = taskRepository.findByQueueAndCompletedIsNull(queueRef.queue); - log.info("Calling {}", callable); - callable.run(); + System.out.println("tasks.size() = " + tasks.size()); - } catch (Throwable e) { - log.warn("fail", e); -// throw new JobExecutionException(e, false); + try { + for (final Task task : tasks) { + try { + executeTask(task); + } catch (TransactionException e) { + log.warn("Task execution failed", e); + } + } + } catch (Exception e) { + log.warn("Error while execution tasks.", e); } } + + private void executeTask(final Task task) { + final Date run = new Date(); + log.info("Setting last run on task. date = {}, task = {}", run, task); + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + protected void doInTransactionWithoutResult(TransactionStatus status) { + task.registerRun(); + taskRepository.save(task); + } + }); + + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + protected void doInTransactionWithoutResult(TransactionStatus status) { + try { + callable.run(); + Date completed = new Date(); + log.info("Setting completed on task. date = {}, task = {}", completed, task); + task.registerComplete(completed); + taskRepository.save(task); + } catch (Exception e) { + throw new RuntimeException("Error while executing callback", e); + } + } + }); + } } - public static class JpaDataSourceJobStore extends JobStoreSupport { + private class MyTransactionSynchronization implements TransactionSynchronization { + + private final AsyncCallable callable; - public JpaDataSourceJobStore() { - setDataSource("wat"); + private final int interval; + + private final JpaQueueRef queueRef; + + public MyTransactionSynchronization(AsyncCallable callable, int interval, JpaQueueRef queueRef) { + this.callable = callable; + this.interval = interval; + this.queueRef = queueRef; } - protected Connection getNonManagedTXConnection() throws JobPersistenceException { -// DefaultTransactionDefinition definition = new DefaultTransactionDefinition(); -// definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); -// TransactionStatus transaction = transactionManagerStatic.getTransaction(definition); + public void suspend() { + } -// System.out.println("dataSourceStatic = " + dataSourceStatic); - Connection c = DataSourceUtils.getConnection(dataSourceStatic); - try { - c.setAutoCommit(false); - } catch (SQLException e) { - throw new RuntimeException(e); - } -// System.out.println("c = " + c); + public void resume() { + } - return c; + public void flush() { } - protected void closeConnection(Connection c) { - try { - DataSourceUtils.doCloseConnection(c, dataSourceStatic); - } catch (SQLException e) { - e.printStackTrace(); - } + public void beforeCommit(boolean readOnly) { + } + public void beforeCompletion() { } - protected Object executeInLock(String lockName, TransactionCallback txCallback) throws JobPersistenceException { - return executeInNonManagedTXLock(lockName, txCallback); + public void afterCommit() { + } + + public void afterCompletion(int status) { + log.info("status = {}", status); + if (status == TransactionSynchronization.STATUS_COMMITTED) { + executor.scheduleAtFixedRate(new CheckTimerTask(callable, queueRef), 1000, 1000 * interval, MILLISECONDS); + } } } } diff --git a/src/main/java/io/trygvis/spring/Config.java b/src/main/java/io/trygvis/spring/Config.java index 6bc8960..5df4dac 100755 --- a/src/main/java/io/trygvis/spring/Config.java +++ b/src/main/java/io/trygvis/spring/Config.java @@ -119,7 +119,7 @@ public class Config { @Bean public LocalContainerEntityManagerFactoryBean entityManagerFactory(DataSource dataSource, @Value("${hibernate.hbm2ddl.auto}") String hbm2ddl, - @Value("${hibernate.showSql}") boolean showSql, + @Value("${hibernate.showSql:false}") boolean showSql, @Value("${hibernate.dialect}") String dialect) { LocalContainerEntityManagerFactoryBean x = new LocalContainerEntityManagerFactoryBean(); x.setDataSource(dataSource); diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml index 5fa174f..39e3dbd 100755 --- a/src/main/resources/logback.xml +++ b/src/main/resources/logback.xml @@ -1,21 +1,15 @@ - ---> - - - - - - - - + + + + + + + -- cgit v1.2.3