From a03d5154456587fc7920e632f083cc5f1e4318a9 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sat, 20 Apr 2013 17:29:18 +0200 Subject: wip --- pom.xml | 218 ++++++------- .../java/io/trygvis/CreateArticleCallable.java | 11 +- src/main/java/io/trygvis/Main.java | 219 ++++++------- .../java/io/trygvis/UpdateArticleCallable.java | 26 +- src/main/java/io/trygvis/data/QueueRepository.java | 16 +- src/main/java/io/trygvis/data/TaskRepository.java | 22 +- src/main/java/io/trygvis/model/Article.java | 114 +++---- src/main/java/io/trygvis/queue/AsyncService.java | 58 ++-- .../java/io/trygvis/queue/JdbcAsyncService.java | 115 ++----- src/main/java/io/trygvis/queue/QueueThread.java | 116 +++++++ src/main/java/io/trygvis/queue/TaskDao.java | 11 +- .../io/trygvis/queue/TaskFailureException.java | 7 + src/main/java/io/trygvis/spring/Config.java | 353 ++++++++++----------- src/main/resources/applicationContext.xml | 44 +-- src/main/resources/logback.xml | 49 +-- 15 files changed, 719 insertions(+), 660 deletions(-) create mode 100644 src/main/java/io/trygvis/queue/QueueThread.java create mode 100644 src/main/java/io/trygvis/queue/TaskFailureException.java diff --git a/pom.xml b/pom.xml index d53bbc1..f6edf30 100755 --- a/pom.xml +++ b/pom.xml @@ -1,109 +1,109 @@ - - 4.0.0 - io.trygvis.2013.04 - quartz-based-queue - 1.0-SNAPSHOT - - - org.hibernate - hibernate-jpamodelgen - 1.2.0.Final - - - org.hibernate - hibernate-entitymanager - 4.1.9.Final - - - org.hibernate - hibernate-core - 4.1.9.Final - - - org.springframework - spring-context - 3.2.2.RELEASE - - - org.springframework - spring-context-support - 3.2.2.RELEASE - - - org.springframework - spring-orm - 3.2.2.RELEASE - - - org.springframework.data - spring-data-jpa - 1.2.0.RELEASE - - - com.jolbox - bonecp-spring - 0.7.1.RELEASE - - - org.quartz-scheduler - quartz - 2.1.7 - - - c3p0 - c3p0 - - - - - com.google.guava - guava - 14.0.1 - - - ch.qos.logback - logback-classic - 1.0.9 - - - org.slf4j - jul-to-slf4j - 1.7.2 - - - org.slf4j - jcl-over-slf4j - 1.7.2 - - - org.postgresql - postgresql - 9.2-1002-jdbc4 - - - - - - org.slf4j - slf4j-api - 1.7.2 - - - org.slf4j - log4j-over-slf4j - 1.7.2 - - - - - - - maven-compiler-plugin - - 1.7 - 1.7 - - - - - + + 4.0.0 + io.trygvis.2013.04 + quartz-based-queue + 1.0-SNAPSHOT + + + org.hibernate + hibernate-jpamodelgen + 1.2.0.Final + + + org.hibernate + hibernate-entitymanager + 4.1.9.Final + + + org.hibernate + hibernate-core + 4.1.9.Final + + + org.springframework + spring-context + 3.2.2.RELEASE + + + org.springframework + spring-context-support + 3.2.2.RELEASE + + + org.springframework + spring-orm + 3.2.2.RELEASE + + + org.springframework.data + spring-data-jpa + 1.2.0.RELEASE + + + com.jolbox + bonecp-spring + 0.7.1.RELEASE + + + org.quartz-scheduler + quartz + 2.1.7 + + + c3p0 + c3p0 + + + + + com.google.guava + guava + 14.0.1 + + + ch.qos.logback + logback-classic + 1.0.9 + + + org.slf4j + jul-to-slf4j + 1.7.2 + + + org.slf4j + jcl-over-slf4j + 1.7.2 + + + org.postgresql + postgresql + 9.2-1002-jdbc4 + + + + + + org.slf4j + slf4j-api + 1.7.2 + + + org.slf4j + log4j-over-slf4j + 1.7.2 + + + + + + + maven-compiler-plugin + + 1.7 + 1.7 + + + + + diff --git a/src/main/java/io/trygvis/CreateArticleCallable.java b/src/main/java/io/trygvis/CreateArticleCallable.java index 85734fc..671f3dc 100755 --- a/src/main/java/io/trygvis/CreateArticleCallable.java +++ b/src/main/java/io/trygvis/CreateArticleCallable.java @@ -6,7 +6,6 @@ import org.slf4j.*; import org.springframework.stereotype.*; import org.springframework.transaction.annotation.*; -import javax.persistence.*; import java.util.*; import static org.springframework.transaction.annotation.Propagation.*; @@ -16,15 +15,15 @@ import static org.springframework.transaction.annotation.Propagation.*; public class CreateArticleCallable implements AsyncService.AsyncCallable { private final Logger log = LoggerFactory.getLogger(getClass()); - @PersistenceContext - private EntityManager entityManager; +// @PersistenceContext +// private EntityManager entityManager; private Random random = new Random(); - public void run() throws Exception { + public void run(List arguments) throws Exception { log.info("CreateArticeJob.run: BEGIN"); - if (random.nextBoolean()) { + if (random.nextInt() % 3 == 0) { throw new RuntimeException("failing create article"); } @@ -33,7 +32,7 @@ public class CreateArticleCallable implements AsyncService.AsyncCallable { log.info("now = {}", now); Article article = new Article(new Date(), null, "title", "body"); - entityManager.persist(article); +// entityManager.persist(article); log.info("CreateArticeJob.run: END"); } diff --git a/src/main/java/io/trygvis/Main.java b/src/main/java/io/trygvis/Main.java index f2f540f..f1bba26 100755 --- a/src/main/java/io/trygvis/Main.java +++ b/src/main/java/io/trygvis/Main.java @@ -1,108 +1,111 @@ -package io.trygvis; - -import io.trygvis.queue.*; -import io.trygvis.queue.Queue; -import org.hibernate.dialect.*; -import org.slf4j.*; -import org.slf4j.bridge.*; -import org.springframework.beans.factory.annotation.*; -import org.springframework.context.support.*; -import org.springframework.stereotype.*; -import org.springframework.transaction.*; -import org.springframework.transaction.support.*; - -import java.util.*; - -import static java.lang.System.*; -import static java.lang.Thread.*; -import static org.springframework.transaction.TransactionDefinition.PROPAGATION_REQUIRED; - -@Component -public class Main { - private static final Logger log = LoggerFactory.getLogger(Main.class); - - public static void main(String[] args) throws Exception { - SLF4JBridgeHandler.install(); - - String username = getProperty("user.name"); - setProperty("database.url", getProperty("jdbc.url", "jdbc:postgresql://localhost/" + username)); - setProperty("database.username", username); - setProperty("database.password", username); -// setProperty("hibernate.showSql", "true"); - setProperty("hibernate.hbm2ddl.auto", "create"); // create - setProperty("hibernate.dialect", PostgreSQL82Dialect.class.getName()); - - log.info("Starting context"); - ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml"); - log.info("Started context"); - - try { - context.getBean(Main.class).run(); - log.info("Sleeping"); - sleep(1000 * 1000); - } catch (Exception e) { - e.printStackTrace(System.out); - } - - log.info("Stopping context"); - context.stop(); - log.info("Stopped context"); - - exit(0); - } - - @Autowired - private TransactionTemplate transactionTemplate; - - @Autowired - private AsyncService asyncService; - - @Autowired - @Qualifier("createArticle") - private AsyncService.AsyncCallable createArticleCallable; - - @Autowired - @Qualifier("createArticle") - private AsyncService.AsyncCallable/*UpdateArticleCallable*/ updateArticleCallable; - - public void run() throws Exception { - log.info("Main.run"); - - final Queue q = asyncService.registerQueue("create-queue", 10, createArticleCallable); -// log.info("queue registered: ref = {}", q); -// asyncService.registerQueue("update-queue", 1, updateArticeCallable); - -// q = asyncService.getQueue("create-queue"); - - final List tasks = new ArrayList<>(); - - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - protected void doInTransactionWithoutResult(TransactionStatus status) { - for (int i = 0; i < 1; i++) { - tasks.add(asyncService.schedule(q)); - } - } - }); - - while (true) { - sleep(10000); - - log.info("tasks.size = {}", tasks.size()); - for (Iterator iterator = tasks.iterator(); iterator.hasNext(); ) { - Task task = iterator.next(); - - task = asyncService.update(task); - - log.info("task = {}", task); - - if (task.isDone()) { - iterator.remove(); - } - } - - if (tasks.isEmpty()) { - break; - } - } - } -} +package io.trygvis; + +import io.trygvis.queue.*; +import io.trygvis.queue.Queue; +import org.hibernate.dialect.*; +import org.slf4j.*; +import org.slf4j.bridge.*; +import org.springframework.beans.factory.annotation.*; +import org.springframework.context.support.*; +import org.springframework.stereotype.*; +import org.springframework.transaction.*; +import org.springframework.transaction.support.*; + +import java.util.*; + +import static java.lang.System.*; +import static java.lang.Thread.*; + +@Component +public class Main { + private static final Logger log = LoggerFactory.getLogger(Main.class); + + public static void main(String[] args) throws Exception { + SLF4JBridgeHandler.install(); + + String username = getProperty("user.name"); + setProperty("database.url", getProperty("jdbc.url", "jdbc:postgresql://localhost/" + username)); + setProperty("database.username", username); + setProperty("database.password", username); +// setProperty("hibernate.showSql", "true"); + setProperty("hibernate.hbm2ddl.auto", "create"); // create + setProperty("hibernate.dialect", PostgreSQL82Dialect.class.getName()); + + log.info("Starting context"); + ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml"); + log.info("Started context"); + + try { + context.getBean(Main.class).run(); +// log.info("Sleeping"); +// sleep(1000 * 1000); + } catch (Exception e) { + e.printStackTrace(System.out); + } + + log.info("Stopping context"); + context.stop(); + log.info("Stopped context"); + + exit(0); + } + + @Autowired + private TransactionTemplate transactionTemplate; + + @Autowired + private AsyncService asyncService; + + @Autowired + @Qualifier("createArticle") + private AsyncService.AsyncCallable createArticleCallable; + + @Autowired + @Qualifier("updateArticle") + private AsyncService.AsyncCallable updateArticleCallable; + + public void run() throws Exception { + log.info("Main.run"); + + final Queue q = asyncService.registerQueue("create-article", 1, createArticleCallable); +// log.info("queue registered: ref = {}", q); +// asyncService.registerQueue("update-queue", 1, updateArticleCallable); + +// q = asyncService.getQueue("create-queue"); + + final List tasks = new ArrayList<>(); + + final int count = 1; + log.info("Creating {} tasks", count); + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + protected void doInTransactionWithoutResult(TransactionStatus status) { + for (int i = 0; i < count; i++) { + tasks.add(asyncService.schedule(q)); + } + } + }); + log.info("Created {} tasks", count); + + while (true) { + sleep(10000); + + log.info("Checking for status of {} tasks", tasks.size()); + for (Iterator iterator = tasks.iterator(); iterator.hasNext(); ) { + Task task = iterator.next(); + + task = asyncService.update(task); + +// log.info("task = {}", task); + + if (task.isDone()) { + iterator.remove(); + } + } + + if (tasks.isEmpty()) { + log.info("No more tasks"); + break; + } + } + } +} diff --git a/src/main/java/io/trygvis/UpdateArticleCallable.java b/src/main/java/io/trygvis/UpdateArticleCallable.java index 7ed8b63..f1ea0e2 100755 --- a/src/main/java/io/trygvis/UpdateArticleCallable.java +++ b/src/main/java/io/trygvis/UpdateArticleCallable.java @@ -1,34 +1,33 @@ package io.trygvis; -import io.trygvis.model.*; -import io.trygvis.queue.*; -import org.slf4j.*; -import org.springframework.stereotype.*; -import org.springframework.transaction.annotation.*; +import io.trygvis.queue.AsyncService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; -import java.util.*; -import javax.persistence.*; +import java.util.Date; +import java.util.List; +import java.util.Random; -import static org.springframework.transaction.annotation.Propagation.*; - -@Component +@Component("updateArticle") public class UpdateArticleCallable implements AsyncService.AsyncCallable { private final Logger log = LoggerFactory.getLogger(getClass()); private final Random r = new Random(); - @PersistenceContext - private EntityManager entityManager; +// @PersistenceContext +// private EntityManager entityManager; // @Transactional(propagation = REQUIRES_NEW) - public void run() throws Exception { + public void run(List arguments) throws Exception { log.info("UpdateArticeJob.run: BEGIN"); Date now = new Date(); log.info("now = {}", now); +/* TypedQuery
q = entityManager.createQuery(entityManager.getCriteriaBuilder().createQuery(Article.class)); List
list = q.getResultList(); @@ -38,6 +37,7 @@ public class UpdateArticleCallable a.setUpdated(new Date()); entityManager.persist(a); +*/ log.info("UpdateArticeJob.run: END"); } diff --git a/src/main/java/io/trygvis/data/QueueRepository.java b/src/main/java/io/trygvis/data/QueueRepository.java index 47ed478..604d5fe 100755 --- a/src/main/java/io/trygvis/data/QueueRepository.java +++ b/src/main/java/io/trygvis/data/QueueRepository.java @@ -1,8 +1,8 @@ -package io.trygvis.data; - -import io.trygvis.queue.*; -import org.springframework.data.jpa.repository.*; - -public interface QueueRepository extends JpaRepository { - Queue findByName(String name); -} +package io.trygvis.data; + +import io.trygvis.queue.*; +import org.springframework.data.jpa.repository.*; + +public interface QueueRepository extends JpaRepository { + Queue findByName(String name); +} diff --git a/src/main/java/io/trygvis/data/TaskRepository.java b/src/main/java/io/trygvis/data/TaskRepository.java index b0710e3..efe4903 100755 --- a/src/main/java/io/trygvis/data/TaskRepository.java +++ b/src/main/java/io/trygvis/data/TaskRepository.java @@ -1,11 +1,11 @@ -package io.trygvis.data; - -import io.trygvis.queue.*; -import io.trygvis.queue.Queue; -import org.springframework.data.jpa.repository.*; - -import java.util.*; - -public interface TaskRepository extends JpaRepository { - List findByQueueAndCompletedIsNull(Queue queue); -} +package io.trygvis.data; + +import io.trygvis.queue.*; +import io.trygvis.queue.Queue; +import org.springframework.data.jpa.repository.*; + +import java.util.*; + +public interface TaskRepository extends JpaRepository { + List findByQueueAndCompletedIsNull(Queue queue); +} diff --git a/src/main/java/io/trygvis/model/Article.java b/src/main/java/io/trygvis/model/Article.java index 6383e50..d144217 100755 --- a/src/main/java/io/trygvis/model/Article.java +++ b/src/main/java/io/trygvis/model/Article.java @@ -1,57 +1,57 @@ -package io.trygvis.model; - -import java.util.Date; - -import javax.persistence.Entity; -import javax.persistence.GeneratedValue; -import javax.persistence.GenerationType; -import javax.persistence.Id; -import javax.persistence.SequenceGenerator; -import javax.persistence.Table; - -@Entity -public class Article { - @Id - @SequenceGenerator(name="id_seq", sequenceName="id_seq") - @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "id_seq") - private Integer id; - private Date created; - private Date updated; - private String title; - private String body; - - @SuppressWarnings("UnusedDeclaration") - private Article() { - } - - public Article(Date created, Date updated, String title, String body) { - this.created = created; - this.updated = updated; - this.title = title; - this.body = body; - } - - public Integer getId() { - return id; - } - - public Date getCreated() { - return created; - } - - public Date getUpdated() { - return updated; - } - - public void setUpdated(Date updated) { - this.updated = updated; - } - - public String getTitle() { - return title; - } - - public String getBody() { - return body; - } -} +package io.trygvis.model; + +import java.util.Date; + +import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Id; +import javax.persistence.SequenceGenerator; +import javax.persistence.Table; + +@Entity +public class Article { + @Id + @SequenceGenerator(name="id_seq", sequenceName="id_seq") + @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "id_seq") + private Integer id; + private Date created; + private Date updated; + private String title; + private String body; + + @SuppressWarnings("UnusedDeclaration") + private Article() { + } + + public Article(Date created, Date updated, String title, String body) { + this.created = created; + this.updated = updated; + this.title = title; + this.body = body; + } + + public Integer getId() { + return id; + } + + public Date getCreated() { + return created; + } + + public Date getUpdated() { + return updated; + } + + public void setUpdated(Date updated) { + this.updated = updated; + } + + public String getTitle() { + return title; + } + + public String getBody() { + return body; + } +} diff --git a/src/main/java/io/trygvis/queue/AsyncService.java b/src/main/java/io/trygvis/queue/AsyncService.java index 10f1b79..b42b550 100755 --- a/src/main/java/io/trygvis/queue/AsyncService.java +++ b/src/main/java/io/trygvis/queue/AsyncService.java @@ -1,28 +1,30 @@ -package io.trygvis.queue; - -import org.quartz.*; - -public interface AsyncService { - - /** - * @param name - * @param interval how often the queue should be polled for missed tasks in seconds. - * @param callable - * @return - * @throws SchedulerException - */ - Queue registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException; - - Queue getQueue(String name); - - Task schedule(Queue queue, String... args); - - /** - * Polls for a new state of the execution. - */ - Task update(Task ref); - - interface AsyncCallable { - void run() throws Exception; - } -} +package io.trygvis.queue; + +import org.quartz.*; + +import java.util.List; + +public interface AsyncService { + + /** + * @param name + * @param interval how often the queue should be polled for missed tasks in seconds. + * @param callable + * @return + * @throws SchedulerException + */ + Queue registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException; + + Queue getQueue(String name); + + Task schedule(Queue queue, String... args); + + /** + * Polls for a new state of the execution. + */ + Task update(Task ref); + + interface AsyncCallable { + void run(List arguments) throws Exception; + } +} diff --git a/src/main/java/io/trygvis/queue/JdbcAsyncService.java b/src/main/java/io/trygvis/queue/JdbcAsyncService.java index 1df0ab6..a8f581e 100644 --- a/src/main/java/io/trygvis/queue/JdbcAsyncService.java +++ b/src/main/java/io/trygvis/queue/JdbcAsyncService.java @@ -4,7 +4,6 @@ import org.quartz.*; import org.slf4j.*; import org.springframework.beans.factory.annotation.*; import org.springframework.stereotype.*; -import org.springframework.transaction.*; import org.springframework.transaction.annotation.*; import org.springframework.transaction.support.*; @@ -43,7 +42,7 @@ public class JdbcAsyncService implements AsyncService { final long interval_; if (q == null) { - q = new Queue(name, interval * 1000); + q = new Queue(name, interval); queueDao.insert(q); interval_ = interval; } else { @@ -51,18 +50,19 @@ public class JdbcAsyncService implements AsyncService { interval_ = q.interval; } - final QueueThread queueThread = new QueueThread(q, callable); + final QueueThread queueThread = new QueueThread(q, taskDao, transactionTemplate, callable); queues.put(name, queueThread); registerSynchronization(new TransactionSynchronizationAdapter() { public void afterCompletion(int status) { - log.info("status = {}", status); + log.info("Transaction completed with status = {}", status); if (status == TransactionSynchronization.STATUS_COMMITTED) { + log.info("Starting thread for queue {} with poll interval = {}s", name, interval); executor.scheduleAtFixedRate(new Runnable() { public void run() { queueThread.ping(); } - }, 1000, 1000 * interval_, MILLISECONDS); + }, 10, interval_, SECONDS); Thread thread = new Thread(queueThread, name); thread.setDaemon(true); thread.start(); @@ -85,9 +85,7 @@ public class JdbcAsyncService implements AsyncService { } @Transactional(propagation = REQUIRED) - public Task schedule(Queue queue, String... args) { - log.info("schedule: ENTER"); - + public Task schedule(final Queue queue, String... args) { Date scheduled = new Date(); StringBuilder arguments = new StringBuilder(); @@ -97,15 +95,22 @@ public class JdbcAsyncService implements AsyncService { long id = taskDao.insert(queue.name, scheduled, arguments.toString()); Task task = new Task(id, queue.name, scheduled, null, 0, null, asList(args)); - log.info("task = {}", task); - queues.get(queue.name).ping(); - try { - Thread.sleep(500); - } catch (InterruptedException e) { - e.printStackTrace(); - } + log.info("Created task = {}", task); +// queues.get(queue.name).ping(); +// try { +// Thread.sleep(500); +// } catch (InterruptedException e) { +// e.printStackTrace(); +// } + + registerSynchronization(new TransactionSynchronizationAdapter() { + public void afterCompletion(int status) { + if (status == TransactionSynchronization.STATUS_COMMITTED) { + queues.get(queue.name).ping(); + } + } + }); - log.info("schedule: LEAVE"); return task; } @@ -113,82 +118,4 @@ public class JdbcAsyncService implements AsyncService { public Task update(Task ref) { return taskDao.findById(ref.id); } - - class QueueThread implements Runnable { - public boolean shouldRun = true; - - public final Queue queue; - - private final AsyncCallable callable; - - QueueThread(Queue queue, AsyncCallable callable) { - this.queue = queue; - this.callable = callable; - } - - public void ping() { - log.info("Sending ping to " + queue); - synchronized (this) { - notify(); - } - } - - public void run() { - while (shouldRun) { - List tasks = taskDao.findByNameAndCompletedIsNull(queue.name); - - log.info("Found {} tasks on queue {}", tasks.size(), queue.name); - - try { - for (final Task task : tasks) { - try { - executeTask(task); - } catch (TransactionException | TaskFailureException e) { - log.warn("Task execution failed", e); - } - } - } catch (Exception e) { - log.warn("Error while executing tasks.", e); - } - - synchronized (this) { - try { - wait(); - } catch (InterruptedException e) { - // ignore - } - } - } - } - - private void executeTask(final Task task) { - final Date run = new Date(); - log.info("Setting last run on task. date = {}, task = {}", run, task); - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - protected void doInTransactionWithoutResult(TransactionStatus status) { - taskDao.update(task.registerRun()); - } - }); - - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - protected void doInTransactionWithoutResult(TransactionStatus status) { - try { - callable.run(); - Date completed = new Date(); - Task t = task.registerComplete(completed); - log.info("Completed task: {}", t); - taskDao.update(t); - } catch (Exception e) { - throw new TaskFailureException(e); - } - } - }); - } - } - - private static class TaskFailureException extends RuntimeException { - public TaskFailureException(Exception e) { - super(e); - } - } } diff --git a/src/main/java/io/trygvis/queue/QueueThread.java b/src/main/java/io/trygvis/queue/QueueThread.java new file mode 100644 index 0000000..a981c7e --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueThread.java @@ -0,0 +1,116 @@ +package io.trygvis.queue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.transaction.TransactionException; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallback; +import org.springframework.transaction.support.TransactionCallbackWithoutResult; +import org.springframework.transaction.support.TransactionTemplate; + +import java.util.Date; +import java.util.List; + +class QueueThread implements Runnable { + private final Logger log = LoggerFactory.getLogger(getClass()); + + public boolean shouldRun = true; + + private boolean checkForNewTasks; + + private boolean busy; + + public final Queue queue; + + private final TaskDao taskDao; + + private final TransactionTemplate transactionTemplate; + + private final AsyncService.AsyncCallable callable; + + QueueThread(Queue queue, TaskDao taskDao, TransactionTemplate transactionTemplate, AsyncService.AsyncCallable callable) { + this.queue = queue; + this.taskDao = taskDao; + this.transactionTemplate = transactionTemplate; + this.callable = callable; + } + + public void ping() { + synchronized (this) { + if (!busy) { + log.info("Sending ping to " + queue); + notify(); + } else { + checkForNewTasks = true; + } + } + } + + public void run() { + while (shouldRun) { + try { + List tasks = transactionTemplate.execute(new TransactionCallback>() { + public List doInTransaction(TransactionStatus status) { + return taskDao.findByNameAndCompletedIsNull(queue.name); + } + }); + + log.info("Found {} tasks on queue {}", tasks.size(), queue.name); + + if(tasks.size() > 0) { + for (final Task task : tasks) { + try { + executeTask(task); + } catch (TransactionException | TaskFailureException e) { + log.warn("Task execution failed", e); + } + } + } + } catch (Throwable e) { + log.warn("Error while executing tasks.", e); + } + + synchronized (this) { + busy = false; + + if (checkForNewTasks) { + log.info("Ping received!"); + checkForNewTasks = false; + continue; + } + + try { + wait(); + } catch (InterruptedException e) { + // ignore + } + + busy = true; + } + } + } + + private void executeTask(final Task task) { + final Date run = new Date(); + log.info("Setting last run on task. date = {}, task = {}", run, task); + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + protected void doInTransactionWithoutResult(TransactionStatus status) { + taskDao.update(task.registerRun()); + } + }); + + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + protected void doInTransactionWithoutResult(TransactionStatus status) { + try { + callable.run(task.arguments); + Date completed = new Date(); + Task t = task.registerComplete(completed); + log.info("Completed task: {}", t); + taskDao.update(t); + } catch (Exception e) { + throw new TaskFailureException(e); + } + } + }); + } +} diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java index 2e407a5..2bf2145 100644 --- a/src/main/java/io/trygvis/queue/TaskDao.java +++ b/src/main/java/io/trygvis/queue/TaskDao.java @@ -3,12 +3,16 @@ package io.trygvis.queue; import org.springframework.beans.factory.annotation.*; import org.springframework.jdbc.core.*; import org.springframework.stereotype.*; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; import java.sql.*; import java.util.Date; import java.util.*; import static java.util.Arrays.*; +import static org.springframework.transaction.annotation.Propagation.MANDATORY; +import static org.springframework.transaction.annotation.Propagation.REQUIRED; @Component public class TaskDao { @@ -16,22 +20,26 @@ public class TaskDao { @Autowired private JdbcTemplate jdbcTemplate; + @Transactional(propagation = MANDATORY) public long insert(String queue, Date scheduled, String arguments) { jdbcTemplate.update("INSERT INTO task(id, run_count, queue, scheduled, arguments) " + "VALUES(nextval('task_seq'), 0, ?, ?, ?)", queue, scheduled, arguments); return jdbcTemplate.queryForObject("SELECT currval('task_seq')", Long.class); } + @Transactional(propagation = MANDATORY) public Task findById(long id) { return jdbcTemplate.queryForObject("SELECT " + TaskRowMapper.fields + " FROM task WHERE id=?", new TaskRowMapper(), id); } + @Transactional(propagation = MANDATORY) public List findByNameAndCompletedIsNull(String name) { return jdbcTemplate.query("SELECT " + TaskRowMapper.fields + " FROM task WHERE queue=? AND completed IS NULL", new TaskRowMapper(), name); } + @Transactional(propagation = MANDATORY) public void update(Task task) { jdbcTemplate.update("UPDATE task SET scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?", task.scheduled, task.lastRun, task.runCount, task.completed, task.id); @@ -41,6 +49,7 @@ public class TaskDao { public static final String fields = "id, queue, scheduled, last_run, run_count, completed, arguments"; public Task mapRow(ResultSet rs, int rowNum) throws SQLException { + String arguments = rs.getString(7); return new Task( rs.getLong(1), rs.getString(2), @@ -48,7 +57,7 @@ public class TaskDao { rs.getTimestamp(4), rs.getInt(5), rs.getTimestamp(6), - asList(rs.getString(7).split(" "))); + arguments != null ? asList(arguments.split(" ")) : Collections.emptyList()); } } } diff --git a/src/main/java/io/trygvis/queue/TaskFailureException.java b/src/main/java/io/trygvis/queue/TaskFailureException.java new file mode 100644 index 0000000..d3d8c48 --- /dev/null +++ b/src/main/java/io/trygvis/queue/TaskFailureException.java @@ -0,0 +1,7 @@ +package io.trygvis.queue; + +class TaskFailureException extends RuntimeException { + public TaskFailureException(Exception e) { + super(e); + } +} diff --git a/src/main/java/io/trygvis/spring/Config.java b/src/main/java/io/trygvis/spring/Config.java index 5dd845f..ca75049 100755 --- a/src/main/java/io/trygvis/spring/Config.java +++ b/src/main/java/io/trygvis/spring/Config.java @@ -1,179 +1,174 @@ -package io.trygvis.spring; - -import com.jolbox.bonecp.*; -import io.trygvis.model.*; -import org.hibernate.*; -import org.hibernate.annotations.*; -import org.hibernate.cfg.*; -import org.hibernate.ejb.*; -import org.springframework.beans.factory.annotation.*; -import org.springframework.context.annotation.*; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.support.*; -import org.springframework.data.jpa.repository.config.*; -import org.springframework.jdbc.core.*; -import org.springframework.jdbc.datasource.*; -import org.springframework.orm.hibernate4.*; -import org.springframework.orm.jpa.*; -import org.springframework.transaction.*; -import org.springframework.transaction.annotation.*; -import org.springframework.transaction.support.*; - -import javax.persistence.*; -import javax.sql.*; -import java.util.*; - -import static org.hibernate.cfg.AvailableSettings.*; -import static org.hibernate.ejb.AvailableSettings.*; -import static org.springframework.transaction.TransactionDefinition.*; - -@Configuration -@ComponentScan(basePackages = "io.trygvis") -@EnableTransactionManagement -@EnableJpaRepositories(basePackages = "io.trygvis.data") -public class Config { - - @Bean - public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() throws Exception { - return new PropertySourcesPlaceholderConfigurer() {{ -// setLocation(new UrlResource("file:environment.properties")); - setProperties(System.getProperties()); - setLocalOverride(true); - }}; - } - - @Bean - public JdbcTemplate jdbcTemplate(DataSource dataSource) { - return new JdbcTemplate(dataSource); - } - -// public SpringBeanJobFactory springBeanJobFactory() { -// SpringBeanJobFactory factory = new SpringBeanJobFactory(); -// return factory; -// } - -/* - @Bean - public SchedulerFactoryBean quartz(DataSource dataSource, PlatformTransactionManager transactionManager) { - SchedulerFactoryBean bean = new SchedulerFactoryBean(); - bean.setApplicationContextSchedulerContextKey("applicationContext"); - bean.setDataSource(dataSource); - bean.setTransactionManager(transactionManager); -// bean.setJobFactory(new JobFactory() { -// public Job newJob(TriggerFiredBundle bundle, Scheduler scheduler) throws SchedulerException { -// Class klass = bundle.getJobDetail().getJobClass(); -// } -// }); - - Properties quartzProperties = new Properties(); - quartzProperties.setProperty("org.quartz.scheduler.skipUpdateCheck", "true"); -// quartzProperties.setProperty("org.quartz.jobStore.selectWithLockSQL", "false"); - quartzProperties.setProperty("org.quartz.jobStore.driverDelegateClass", PostgreSQLDelegate.class.getName()); - quartzProperties.setProperty("org.quartz.scheduler.jmx.export", "true"); - bean.setQuartzProperties(quartzProperties); - return bean; - } -*/ - - // This turns out to be fairly useless as Spring won't register them automatically. - // It's probably better to use @Scheduled/@Async instead - /* - @Bean(name = "my-job") - public JobDetailFactoryBean myJobDetailBean() { - JobDetailFactoryBean bean = new JobDetailFactoryBean(); - bean.setJobClass(MyJob.class); - bean.setDurability(true); - - return bean; - } - - @Bean - public CronTriggerFactoryBean myJobTrigger(JobDetail jobDetail) { - CronTriggerFactoryBean bean = new CronTriggerFactoryBean(); - bean.setName("my-trigger"); - bean.setBeanName("my-job"); - bean.setJobDetail(jobDetail); - bean.setCronExpression("0/10 * * * * ?"); - - return bean; - } - */ - - @Bean - public DataSource dataSource(@Value("${database.url}") String jdbcUrl, - @Value("${database.username}") String username, - @Value("${database.password}") String password) { - BoneCPDataSource ds = new BoneCPDataSource(); - - ds.setLogStatementsEnabled(true); - - ds.setJdbcUrl(jdbcUrl); - ds.setUsername(username); - ds.setPassword(password); - - ds.setIdleConnectionTestPeriodInSeconds(60); - ds.setIdleMaxAgeInSeconds(240); - ds.setMaxConnectionsPerPartition(40); - ds.setMinConnectionsPerPartition(0); - ds.setPartitionCount(1); - ds.setAcquireIncrement(1); - ds.setStatementsCacheSize(1000); - ds.setReleaseHelperThreads(3); - ds.setStatisticsEnabled(true); - return new TransactionAwareDataSourceProxy(new LazyConnectionDataSourceProxy(ds)); - } - - @Bean - public LocalContainerEntityManagerFactoryBean entityManagerFactory(DataSource dataSource, - @Value("${hibernate.hbm2ddl.auto}") String hbm2ddl, - @Value("${hibernate.showSql:false}") boolean showSql, - @Value("${hibernate.dialect}") String dialect) { - LocalContainerEntityManagerFactoryBean x = new LocalContainerEntityManagerFactoryBean(); - x.setDataSource(dataSource); - x.setJpaPropertyMap(createJpaMap(hbm2ddl, showSql, dialect)); - x.setPackagesToScan(Article.class.getPackage().getName()); - HibernatePersistence persistenceProvider = new HibernatePersistence(); - x.setPersistenceProvider(persistenceProvider); - return x; - } - - public static Map createJpaMap(String hbm2ddl, boolean showSql, String dialect) { - Map map = new HashMap<>(); - map.put(HBM2DDL_AUTO, hbm2ddl); - map.put(FORMAT_SQL, showSql); - map.put(SHOW_SQL, showSql); - map.put(USE_SQL_COMMENTS, showSql); - map.put(GENERATE_STATISTICS, true); - map.put(NAMING_STRATEGY, ImprovedNamingStrategy.class.getName()); - - map.put(DEFAULT_CACHE_CONCURRENCY_STRATEGY, CacheConcurrencyStrategy.READ_WRITE.toString()); - map.put(CURRENT_SESSION_CONTEXT_CLASS, SpringSessionContext.class.getName()); -// map.put(CACHE_REGION_FACTORY, EhCacheRegionFactory.class.getName()); - map.put(USE_SECOND_LEVEL_CACHE, false); - map.put(USE_QUERY_CACHE, false); -// map.put(SHARED_CACHE_MODE, SharedCacheMode.ENABLE_SELECTIVE.toString()); - - map.put(DIALECT, dialect); - map.put("hibernate.temp.use_jdbc_metadata_defaults", "false"); - - return map; - } - - @Bean - public SessionFactory sessionFactory(LocalContainerEntityManagerFactoryBean entityManagerFactory) { - return ((HibernateEntityManagerFactory) entityManagerFactory.nativeEntityManagerFactory).getSessionFactory(); - } - - @Bean - public JpaTransactionManager transactionManager(EntityManagerFactory entityManagerFactory) { - return new JpaTransactionManager(entityManagerFactory); - } - - @Bean - public TransactionTemplate transactionTemplate(PlatformTransactionManager platformTransactionManager) { - DefaultTransactionDefinition td = new DefaultTransactionDefinition(); - td.setPropagationBehavior(PROPAGATION_REQUIRED); - td.setIsolationLevel(ISOLATION_READ_COMMITTED); - return new TransactionTemplate(platformTransactionManager, td); - } -} +package io.trygvis.spring; + +import com.jolbox.bonecp.BoneCPDataSource; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.support.PropertySourcesPlaceholderConfigurer; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy; +import org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy; +import org.springframework.orm.jpa.JpaTransactionManager; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.annotation.EnableTransactionManagement; +import org.springframework.transaction.support.TransactionTemplate; + +import javax.persistence.EntityManagerFactory; +import javax.sql.DataSource; + +@Configuration +@ComponentScan(basePackages = "io.trygvis") +@EnableTransactionManagement +//@EnableJpaRepositories(basePackages = "io.trygvis.data") +public class Config { + + @Bean + public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() throws Exception { + return new PropertySourcesPlaceholderConfigurer() {{ +// setLocation(new UrlResource("file:environment.properties")); + setProperties(System.getProperties()); + setLocalOverride(true); + }}; + } + + @Bean + public JdbcTemplate jdbcTemplate(DataSource dataSource) { + return new JdbcTemplate(dataSource); + } + +// public SpringBeanJobFactory springBeanJobFactory() { +// SpringBeanJobFactory factory = new SpringBeanJobFactory(); +// return factory; +// } + +/* + @Bean + public SchedulerFactoryBean quartz(DataSource dataSource, PlatformTransactionManager transactionManager) { + SchedulerFactoryBean bean = new SchedulerFactoryBean(); + bean.setApplicationContextSchedulerContextKey("applicationContext"); + bean.setDataSource(dataSource); + bean.setTransactionManager(transactionManager); +// bean.setJobFactory(new JobFactory() { +// public Job newJob(TriggerFiredBundle bundle, Scheduler scheduler) throws SchedulerException { +// Class klass = bundle.getJobDetail().getJobClass(); +// } +// }); + + Properties quartzProperties = new Properties(); + quartzProperties.setProperty("org.quartz.scheduler.skipUpdateCheck", "true"); +// quartzProperties.setProperty("org.quartz.jobStore.selectWithLockSQL", "false"); + quartzProperties.setProperty("org.quartz.jobStore.driverDelegateClass", PostgreSQLDelegate.class.getName()); + quartzProperties.setProperty("org.quartz.scheduler.jmx.export", "true"); + bean.setQuartzProperties(quartzProperties); + return bean; + } +*/ + + // This turns out to be fairly useless as Spring won't register them automatically. + // It's probably better to use @Scheduled/@Async instead + /* + @Bean(name = "my-job") + public JobDetailFactoryBean myJobDetailBean() { + JobDetailFactoryBean bean = new JobDetailFactoryBean(); + bean.setJobClass(MyJob.class); + bean.setDurability(true); + + return bean; + } + + @Bean + public CronTriggerFactoryBean myJobTrigger(JobDetail jobDetail) { + CronTriggerFactoryBean bean = new CronTriggerFactoryBean(); + bean.setName("my-trigger"); + bean.setBeanName("my-job"); + bean.setJobDetail(jobDetail); + bean.setCronExpression("0/10 * * * * ?"); + + return bean; + } + */ + + @Bean + public DataSource dataSource(@Value("${database.url}") String jdbcUrl, + @Value("${database.username}") String username, + @Value("${database.password}") String password) { + BoneCPDataSource ds = new BoneCPDataSource(); + + ds.setLogStatementsEnabled(true); + + ds.setJdbcUrl(jdbcUrl); + ds.setUsername(username); + ds.setPassword(password); + + ds.setIdleConnectionTestPeriodInSeconds(60); + ds.setIdleMaxAgeInSeconds(240); + ds.setMaxConnectionsPerPartition(40); + ds.setMinConnectionsPerPartition(0); + ds.setPartitionCount(1); + ds.setAcquireIncrement(1); + ds.setStatementsCacheSize(1000); + ds.setReleaseHelperThreads(3); + ds.setStatisticsEnabled(true); + return new TransactionAwareDataSourceProxy(new LazyConnectionDataSourceProxy(ds)); + } + +/* + @Bean + public LocalContainerEntityManagerFactoryBean entityManagerFactory(DataSource dataSource, + @Value("${hibernate.hbm2ddl.auto}") String hbm2ddl, + @Value("${hibernate.showSql:false}") boolean showSql, + @Value("${hibernate.dialect}") String dialect) { + LocalContainerEntityManagerFactoryBean x = new LocalContainerEntityManagerFactoryBean(); + x.setDataSource(dataSource); + x.setJpaPropertyMap(createJpaMap(hbm2ddl, showSql, dialect)); + x.setPackagesToScan(Article.class.getPackage().getName()); + HibernatePersistence persistenceProvider = new HibernatePersistence(); + x.setPersistenceProvider(persistenceProvider); + return x; + } + + public static Map createJpaMap(String hbm2ddl, boolean showSql, String dialect) { + Map map = new HashMap<>(); + map.put(HBM2DDL_AUTO, hbm2ddl); + map.put(FORMAT_SQL, showSql); + map.put(SHOW_SQL, showSql); + map.put(USE_SQL_COMMENTS, showSql); + map.put(GENERATE_STATISTICS, true); + map.put(NAMING_STRATEGY, ImprovedNamingStrategy.class.getName()); + + map.put(DEFAULT_CACHE_CONCURRENCY_STRATEGY, CacheConcurrencyStrategy.READ_WRITE.toString()); + map.put(CURRENT_SESSION_CONTEXT_CLASS, SpringSessionContext.class.getName()); +// map.put(CACHE_REGION_FACTORY, EhCacheRegionFactory.class.getName()); + map.put(USE_SECOND_LEVEL_CACHE, false); + map.put(USE_QUERY_CACHE, false); +// map.put(SHARED_CACHE_MODE, SharedCacheMode.ENABLE_SELECTIVE.toString()); + + map.put(DIALECT, dialect); + map.put("hibernate.temp.use_jdbc_metadata_defaults", "false"); + + return map; + } + + @Bean + public SessionFactory sessionFactory(LocalContainerEntityManagerFactoryBean entityManagerFactory) { + return ((HibernateEntityManagerFactory) entityManagerFactory.nativeEntityManagerFactory).getSessionFactory(); + } + + @Bean + public JpaTransactionManager transactionManager(EntityManagerFactory entityManagerFactory) { + return new JpaTransactionManager(entityManagerFactory); + } +*/ + + @Bean + public PlatformTransactionManager transactionManager(DataSource dataSource) { + return new DataSourceTransactionManager(dataSource); + } + + @Bean + public TransactionTemplate transactionTemplate(PlatformTransactionManager platformTransactionManager) { + return new TransactionTemplate(platformTransactionManager); + } +} diff --git a/src/main/resources/applicationContext.xml b/src/main/resources/applicationContext.xml index 9856a0b..2d33d8c 100755 --- a/src/main/resources/applicationContext.xml +++ b/src/main/resources/applicationContext.xml @@ -1,22 +1,22 @@ - - - - - - + + + + + + diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml index a25644b..2def2c8 100755 --- a/src/main/resources/logback.xml +++ b/src/main/resources/logback.xml @@ -1,24 +1,25 @@ - - - - - - - - - - - - - - - - - %d{HH:mm:ss.SSS} [%-15thread] %-5level %-50logger{36} - %msg%n - - - - - - - + + + + + + + + + + + + + + + + + + %d{HH:mm:ss.SSS} [%-15thread] %-5level %-60logger{60} - %msg%n + + + + + + + -- cgit v1.2.3