diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2013-04-20 17:29:18 +0200 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2013-04-20 17:31:02 +0200 |
commit | a03d5154456587fc7920e632f083cc5f1e4318a9 (patch) | |
tree | 08cddc03fc61aae0cfd4deb08bd8f99aca19bd40 /src/main/java | |
parent | 637dddf11f5d60b35c9696914e1e2658b2ddc611 (diff) | |
download | quartz-based-queue-a03d5154456587fc7920e632f083cc5f1e4318a9.tar.gz quartz-based-queue-a03d5154456587fc7920e632f083cc5f1e4318a9.tar.bz2 quartz-based-queue-a03d5154456587fc7920e632f083cc5f1e4318a9.tar.xz quartz-based-queue-a03d5154456587fc7920e632f083cc5f1e4318a9.zip |
wip
Diffstat (limited to 'src/main/java')
-rwxr-xr-x | src/main/java/io/trygvis/CreateArticleCallable.java | 11 | ||||
-rwxr-xr-x | src/main/java/io/trygvis/Main.java | 219 | ||||
-rwxr-xr-x | src/main/java/io/trygvis/UpdateArticleCallable.java | 26 | ||||
-rwxr-xr-x | src/main/java/io/trygvis/data/QueueRepository.java | 16 | ||||
-rwxr-xr-x | src/main/java/io/trygvis/data/TaskRepository.java | 22 | ||||
-rwxr-xr-x | src/main/java/io/trygvis/model/Article.java | 114 | ||||
-rwxr-xr-x | src/main/java/io/trygvis/queue/AsyncService.java | 58 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/JdbcAsyncService.java | 115 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/QueueThread.java | 116 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/TaskDao.java | 11 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/TaskFailureException.java | 7 | ||||
-rwxr-xr-x | src/main/java/io/trygvis/spring/Config.java | 353 |
12 files changed, 563 insertions, 505 deletions
diff --git a/src/main/java/io/trygvis/CreateArticleCallable.java b/src/main/java/io/trygvis/CreateArticleCallable.java index 85734fc..671f3dc 100755 --- a/src/main/java/io/trygvis/CreateArticleCallable.java +++ b/src/main/java/io/trygvis/CreateArticleCallable.java @@ -6,7 +6,6 @@ import org.slf4j.*; import org.springframework.stereotype.*; import org.springframework.transaction.annotation.*; -import javax.persistence.*; import java.util.*; import static org.springframework.transaction.annotation.Propagation.*; @@ -16,15 +15,15 @@ import static org.springframework.transaction.annotation.Propagation.*; public class CreateArticleCallable implements AsyncService.AsyncCallable { private final Logger log = LoggerFactory.getLogger(getClass()); - @PersistenceContext - private EntityManager entityManager; +// @PersistenceContext +// private EntityManager entityManager; private Random random = new Random(); - public void run() throws Exception { + public void run(List<String> arguments) throws Exception { log.info("CreateArticeJob.run: BEGIN"); - if (random.nextBoolean()) { + if (random.nextInt() % 3 == 0) { throw new RuntimeException("failing create article"); } @@ -33,7 +32,7 @@ public class CreateArticleCallable implements AsyncService.AsyncCallable { log.info("now = {}", now); Article article = new Article(new Date(), null, "title", "body"); - entityManager.persist(article); +// entityManager.persist(article); log.info("CreateArticeJob.run: END"); } diff --git a/src/main/java/io/trygvis/Main.java b/src/main/java/io/trygvis/Main.java index f2f540f..f1bba26 100755 --- a/src/main/java/io/trygvis/Main.java +++ b/src/main/java/io/trygvis/Main.java @@ -1,108 +1,111 @@ -package io.trygvis;
-
-import io.trygvis.queue.*;
-import io.trygvis.queue.Queue;
-import org.hibernate.dialect.*;
-import org.slf4j.*;
-import org.slf4j.bridge.*;
-import org.springframework.beans.factory.annotation.*;
-import org.springframework.context.support.*;
-import org.springframework.stereotype.*;
-import org.springframework.transaction.*;
-import org.springframework.transaction.support.*;
-
-import java.util.*;
-
-import static java.lang.System.*;
-import static java.lang.Thread.*;
-import static org.springframework.transaction.TransactionDefinition.PROPAGATION_REQUIRED;
-
-@Component
-public class Main {
- private static final Logger log = LoggerFactory.getLogger(Main.class);
-
- public static void main(String[] args) throws Exception {
- SLF4JBridgeHandler.install();
-
- String username = getProperty("user.name");
- setProperty("database.url", getProperty("jdbc.url", "jdbc:postgresql://localhost/" + username));
- setProperty("database.username", username);
- setProperty("database.password", username);
-// setProperty("hibernate.showSql", "true");
- setProperty("hibernate.hbm2ddl.auto", "create"); // create
- setProperty("hibernate.dialect", PostgreSQL82Dialect.class.getName());
-
- log.info("Starting context");
- ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
- log.info("Started context");
-
- try {
- context.getBean(Main.class).run();
- log.info("Sleeping");
- sleep(1000 * 1000);
- } catch (Exception e) {
- e.printStackTrace(System.out);
- }
-
- log.info("Stopping context");
- context.stop();
- log.info("Stopped context");
-
- exit(0);
- }
-
- @Autowired
- private TransactionTemplate transactionTemplate;
-
- @Autowired
- private AsyncService asyncService;
-
- @Autowired
- @Qualifier("createArticle")
- private AsyncService.AsyncCallable createArticleCallable;
-
- @Autowired
- @Qualifier("createArticle")
- private AsyncService.AsyncCallable/*UpdateArticleCallable*/ updateArticleCallable;
-
- public void run() throws Exception {
- log.info("Main.run");
-
- final Queue q = asyncService.registerQueue("create-queue", 10, createArticleCallable);
-// log.info("queue registered: ref = {}", q);
-// asyncService.registerQueue("update-queue", 1, updateArticeCallable);
-
-// q = asyncService.getQueue("create-queue");
-
- final List<Task> tasks = new ArrayList<>();
-
- transactionTemplate.execute(new TransactionCallbackWithoutResult() {
- protected void doInTransactionWithoutResult(TransactionStatus status) {
- for (int i = 0; i < 1; i++) {
- tasks.add(asyncService.schedule(q));
- }
- }
- });
-
- while (true) {
- sleep(10000);
-
- log.info("tasks.size = {}", tasks.size());
- for (Iterator<Task> iterator = tasks.iterator(); iterator.hasNext(); ) {
- Task task = iterator.next();
-
- task = asyncService.update(task);
-
- log.info("task = {}", task);
-
- if (task.isDone()) {
- iterator.remove();
- }
- }
-
- if (tasks.isEmpty()) {
- break;
- }
- }
- }
-}
+package io.trygvis; + +import io.trygvis.queue.*; +import io.trygvis.queue.Queue; +import org.hibernate.dialect.*; +import org.slf4j.*; +import org.slf4j.bridge.*; +import org.springframework.beans.factory.annotation.*; +import org.springframework.context.support.*; +import org.springframework.stereotype.*; +import org.springframework.transaction.*; +import org.springframework.transaction.support.*; + +import java.util.*; + +import static java.lang.System.*; +import static java.lang.Thread.*; + +@Component +public class Main { + private static final Logger log = LoggerFactory.getLogger(Main.class); + + public static void main(String[] args) throws Exception { + SLF4JBridgeHandler.install(); + + String username = getProperty("user.name"); + setProperty("database.url", getProperty("jdbc.url", "jdbc:postgresql://localhost/" + username)); + setProperty("database.username", username); + setProperty("database.password", username); +// setProperty("hibernate.showSql", "true"); + setProperty("hibernate.hbm2ddl.auto", "create"); // create + setProperty("hibernate.dialect", PostgreSQL82Dialect.class.getName()); + + log.info("Starting context"); + ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml"); + log.info("Started context"); + + try { + context.getBean(Main.class).run(); +// log.info("Sleeping"); +// sleep(1000 * 1000); + } catch (Exception e) { + e.printStackTrace(System.out); + } + + log.info("Stopping context"); + context.stop(); + log.info("Stopped context"); + + exit(0); + } + + @Autowired + private TransactionTemplate transactionTemplate; + + @Autowired + private AsyncService asyncService; + + @Autowired + @Qualifier("createArticle") + private AsyncService.AsyncCallable createArticleCallable; + + @Autowired + @Qualifier("updateArticle") + private AsyncService.AsyncCallable updateArticleCallable; + + public void run() throws Exception { + log.info("Main.run"); + + final Queue q = asyncService.registerQueue("create-article", 1, createArticleCallable); +// log.info("queue registered: ref = {}", q); +// asyncService.registerQueue("update-queue", 1, updateArticleCallable); + +// q = asyncService.getQueue("create-queue"); + + final List<Task> tasks = new ArrayList<>(); + + final int count = 1; + log.info("Creating {} tasks", count); + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + protected void doInTransactionWithoutResult(TransactionStatus status) { + for (int i = 0; i < count; i++) { + tasks.add(asyncService.schedule(q)); + } + } + }); + log.info("Created {} tasks", count); + + while (true) { + sleep(10000); + + log.info("Checking for status of {} tasks", tasks.size()); + for (Iterator<Task> iterator = tasks.iterator(); iterator.hasNext(); ) { + Task task = iterator.next(); + + task = asyncService.update(task); + +// log.info("task = {}", task); + + if (task.isDone()) { + iterator.remove(); + } + } + + if (tasks.isEmpty()) { + log.info("No more tasks"); + break; + } + } + } +} diff --git a/src/main/java/io/trygvis/UpdateArticleCallable.java b/src/main/java/io/trygvis/UpdateArticleCallable.java index 7ed8b63..f1ea0e2 100755 --- a/src/main/java/io/trygvis/UpdateArticleCallable.java +++ b/src/main/java/io/trygvis/UpdateArticleCallable.java @@ -1,34 +1,33 @@ package io.trygvis; -import io.trygvis.model.*; -import io.trygvis.queue.*; -import org.slf4j.*; -import org.springframework.stereotype.*; -import org.springframework.transaction.annotation.*; +import io.trygvis.queue.AsyncService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; -import java.util.*; -import javax.persistence.*; +import java.util.Date; +import java.util.List; +import java.util.Random; -import static org.springframework.transaction.annotation.Propagation.*; - -@Component +@Component("updateArticle") public class UpdateArticleCallable implements AsyncService.AsyncCallable { private final Logger log = LoggerFactory.getLogger(getClass()); private final Random r = new Random(); - @PersistenceContext - private EntityManager entityManager; +// @PersistenceContext +// private EntityManager entityManager; // @Transactional(propagation = REQUIRES_NEW) - public void run() throws Exception { + public void run(List<String> arguments) throws Exception { log.info("UpdateArticeJob.run: BEGIN"); Date now = new Date(); log.info("now = {}", now); +/* TypedQuery<Article> q = entityManager.createQuery(entityManager.getCriteriaBuilder().createQuery(Article.class)); List<Article> list = q.getResultList(); @@ -38,6 +37,7 @@ public class UpdateArticleCallable a.setUpdated(new Date()); entityManager.persist(a); +*/ log.info("UpdateArticeJob.run: END"); } diff --git a/src/main/java/io/trygvis/data/QueueRepository.java b/src/main/java/io/trygvis/data/QueueRepository.java index 47ed478..604d5fe 100755 --- a/src/main/java/io/trygvis/data/QueueRepository.java +++ b/src/main/java/io/trygvis/data/QueueRepository.java @@ -1,8 +1,8 @@ -package io.trygvis.data;
-
-import io.trygvis.queue.*;
-import org.springframework.data.jpa.repository.*;
-
-public interface QueueRepository extends JpaRepository<Queue, Long> {
- Queue findByName(String name);
-}
+package io.trygvis.data; + +import io.trygvis.queue.*; +import org.springframework.data.jpa.repository.*; + +public interface QueueRepository extends JpaRepository<Queue, Long> { + Queue findByName(String name); +} diff --git a/src/main/java/io/trygvis/data/TaskRepository.java b/src/main/java/io/trygvis/data/TaskRepository.java index b0710e3..efe4903 100755 --- a/src/main/java/io/trygvis/data/TaskRepository.java +++ b/src/main/java/io/trygvis/data/TaskRepository.java @@ -1,11 +1,11 @@ -package io.trygvis.data;
-
-import io.trygvis.queue.*;
-import io.trygvis.queue.Queue;
-import org.springframework.data.jpa.repository.*;
-
-import java.util.*;
-
-public interface TaskRepository extends JpaRepository<Task, Long> {
- List<Task> findByQueueAndCompletedIsNull(Queue queue);
-}
+package io.trygvis.data; + +import io.trygvis.queue.*; +import io.trygvis.queue.Queue; +import org.springframework.data.jpa.repository.*; + +import java.util.*; + +public interface TaskRepository extends JpaRepository<Task, Long> { + List<Task> findByQueueAndCompletedIsNull(Queue queue); +} diff --git a/src/main/java/io/trygvis/model/Article.java b/src/main/java/io/trygvis/model/Article.java index 6383e50..d144217 100755 --- a/src/main/java/io/trygvis/model/Article.java +++ b/src/main/java/io/trygvis/model/Article.java @@ -1,57 +1,57 @@ -package io.trygvis.model;
-
-import java.util.Date;
-
-import javax.persistence.Entity;
-import javax.persistence.GeneratedValue;
-import javax.persistence.GenerationType;
-import javax.persistence.Id;
-import javax.persistence.SequenceGenerator;
-import javax.persistence.Table;
-
-@Entity
-public class Article {
- @Id
- @SequenceGenerator(name="id_seq", sequenceName="id_seq")
- @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "id_seq")
- private Integer id;
- private Date created;
- private Date updated;
- private String title;
- private String body;
-
- @SuppressWarnings("UnusedDeclaration")
- private Article() {
- }
-
- public Article(Date created, Date updated, String title, String body) {
- this.created = created;
- this.updated = updated;
- this.title = title;
- this.body = body;
- }
-
- public Integer getId() {
- return id;
- }
-
- public Date getCreated() {
- return created;
- }
-
- public Date getUpdated() {
- return updated;
- }
-
- public void setUpdated(Date updated) {
- this.updated = updated;
- }
-
- public String getTitle() {
- return title;
- }
-
- public String getBody() {
- return body;
- }
-}
+package io.trygvis.model; + +import java.util.Date; + +import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Id; +import javax.persistence.SequenceGenerator; +import javax.persistence.Table; + +@Entity +public class Article { + @Id + @SequenceGenerator(name="id_seq", sequenceName="id_seq") + @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "id_seq") + private Integer id; + private Date created; + private Date updated; + private String title; + private String body; + + @SuppressWarnings("UnusedDeclaration") + private Article() { + } + + public Article(Date created, Date updated, String title, String body) { + this.created = created; + this.updated = updated; + this.title = title; + this.body = body; + } + + public Integer getId() { + return id; + } + + public Date getCreated() { + return created; + } + + public Date getUpdated() { + return updated; + } + + public void setUpdated(Date updated) { + this.updated = updated; + } + + public String getTitle() { + return title; + } + + public String getBody() { + return body; + } +} diff --git a/src/main/java/io/trygvis/queue/AsyncService.java b/src/main/java/io/trygvis/queue/AsyncService.java index 10f1b79..b42b550 100755 --- a/src/main/java/io/trygvis/queue/AsyncService.java +++ b/src/main/java/io/trygvis/queue/AsyncService.java @@ -1,28 +1,30 @@ -package io.trygvis.queue;
-
-import org.quartz.*;
-
-public interface AsyncService {
-
- /**
- * @param name
- * @param interval how often the queue should be polled for missed tasks in seconds.
- * @param callable
- * @return
- * @throws SchedulerException
- */
- Queue registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException;
-
- Queue getQueue(String name);
-
- Task schedule(Queue queue, String... args);
-
- /**
- * Polls for a new state of the execution.
- */
- Task update(Task ref);
-
- interface AsyncCallable {
- void run() throws Exception;
- }
-}
+package io.trygvis.queue; + +import org.quartz.*; + +import java.util.List; + +public interface AsyncService { + + /** + * @param name + * @param interval how often the queue should be polled for missed tasks in seconds. + * @param callable + * @return + * @throws SchedulerException + */ + Queue registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException; + + Queue getQueue(String name); + + Task schedule(Queue queue, String... args); + + /** + * Polls for a new state of the execution. + */ + Task update(Task ref); + + interface AsyncCallable { + void run(List<String> arguments) throws Exception; + } +} diff --git a/src/main/java/io/trygvis/queue/JdbcAsyncService.java b/src/main/java/io/trygvis/queue/JdbcAsyncService.java index 1df0ab6..a8f581e 100644 --- a/src/main/java/io/trygvis/queue/JdbcAsyncService.java +++ b/src/main/java/io/trygvis/queue/JdbcAsyncService.java @@ -4,7 +4,6 @@ import org.quartz.*; import org.slf4j.*; import org.springframework.beans.factory.annotation.*; import org.springframework.stereotype.*; -import org.springframework.transaction.*; import org.springframework.transaction.annotation.*; import org.springframework.transaction.support.*; @@ -43,7 +42,7 @@ public class JdbcAsyncService implements AsyncService { final long interval_; if (q == null) { - q = new Queue(name, interval * 1000); + q = new Queue(name, interval); queueDao.insert(q); interval_ = interval; } else { @@ -51,18 +50,19 @@ public class JdbcAsyncService implements AsyncService { interval_ = q.interval; } - final QueueThread queueThread = new QueueThread(q, callable); + final QueueThread queueThread = new QueueThread(q, taskDao, transactionTemplate, callable); queues.put(name, queueThread); registerSynchronization(new TransactionSynchronizationAdapter() { public void afterCompletion(int status) { - log.info("status = {}", status); + log.info("Transaction completed with status = {}", status); if (status == TransactionSynchronization.STATUS_COMMITTED) { + log.info("Starting thread for queue {} with poll interval = {}s", name, interval); executor.scheduleAtFixedRate(new Runnable() { public void run() { queueThread.ping(); } - }, 1000, 1000 * interval_, MILLISECONDS); + }, 10, interval_, SECONDS); Thread thread = new Thread(queueThread, name); thread.setDaemon(true); thread.start(); @@ -85,9 +85,7 @@ public class JdbcAsyncService implements AsyncService { } @Transactional(propagation = REQUIRED) - public Task schedule(Queue queue, String... args) { - log.info("schedule: ENTER"); - + public Task schedule(final Queue queue, String... args) { Date scheduled = new Date(); StringBuilder arguments = new StringBuilder(); @@ -97,15 +95,22 @@ public class JdbcAsyncService implements AsyncService { long id = taskDao.insert(queue.name, scheduled, arguments.toString()); Task task = new Task(id, queue.name, scheduled, null, 0, null, asList(args)); - log.info("task = {}", task); - queues.get(queue.name).ping(); - try { - Thread.sleep(500); - } catch (InterruptedException e) { - e.printStackTrace(); - } + log.info("Created task = {}", task); +// queues.get(queue.name).ping(); +// try { +// Thread.sleep(500); +// } catch (InterruptedException e) { +// e.printStackTrace(); +// } + + registerSynchronization(new TransactionSynchronizationAdapter() { + public void afterCompletion(int status) { + if (status == TransactionSynchronization.STATUS_COMMITTED) { + queues.get(queue.name).ping(); + } + } + }); - log.info("schedule: LEAVE"); return task; } @@ -113,82 +118,4 @@ public class JdbcAsyncService implements AsyncService { public Task update(Task ref) { return taskDao.findById(ref.id); } - - class QueueThread implements Runnable { - public boolean shouldRun = true; - - public final Queue queue; - - private final AsyncCallable callable; - - QueueThread(Queue queue, AsyncCallable callable) { - this.queue = queue; - this.callable = callable; - } - - public void ping() { - log.info("Sending ping to " + queue); - synchronized (this) { - notify(); - } - } - - public void run() { - while (shouldRun) { - List<Task> tasks = taskDao.findByNameAndCompletedIsNull(queue.name); - - log.info("Found {} tasks on queue {}", tasks.size(), queue.name); - - try { - for (final Task task : tasks) { - try { - executeTask(task); - } catch (TransactionException | TaskFailureException e) { - log.warn("Task execution failed", e); - } - } - } catch (Exception e) { - log.warn("Error while executing tasks.", e); - } - - synchronized (this) { - try { - wait(); - } catch (InterruptedException e) { - // ignore - } - } - } - } - - private void executeTask(final Task task) { - final Date run = new Date(); - log.info("Setting last run on task. date = {}, task = {}", run, task); - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - protected void doInTransactionWithoutResult(TransactionStatus status) { - taskDao.update(task.registerRun()); - } - }); - - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - protected void doInTransactionWithoutResult(TransactionStatus status) { - try { - callable.run(); - Date completed = new Date(); - Task t = task.registerComplete(completed); - log.info("Completed task: {}", t); - taskDao.update(t); - } catch (Exception e) { - throw new TaskFailureException(e); - } - } - }); - } - } - - private static class TaskFailureException extends RuntimeException { - public TaskFailureException(Exception e) { - super(e); - } - } } diff --git a/src/main/java/io/trygvis/queue/QueueThread.java b/src/main/java/io/trygvis/queue/QueueThread.java new file mode 100644 index 0000000..a981c7e --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueThread.java @@ -0,0 +1,116 @@ +package io.trygvis.queue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.transaction.TransactionException; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallback; +import org.springframework.transaction.support.TransactionCallbackWithoutResult; +import org.springframework.transaction.support.TransactionTemplate; + +import java.util.Date; +import java.util.List; + +class QueueThread implements Runnable { + private final Logger log = LoggerFactory.getLogger(getClass()); + + public boolean shouldRun = true; + + private boolean checkForNewTasks; + + private boolean busy; + + public final Queue queue; + + private final TaskDao taskDao; + + private final TransactionTemplate transactionTemplate; + + private final AsyncService.AsyncCallable callable; + + QueueThread(Queue queue, TaskDao taskDao, TransactionTemplate transactionTemplate, AsyncService.AsyncCallable callable) { + this.queue = queue; + this.taskDao = taskDao; + this.transactionTemplate = transactionTemplate; + this.callable = callable; + } + + public void ping() { + synchronized (this) { + if (!busy) { + log.info("Sending ping to " + queue); + notify(); + } else { + checkForNewTasks = true; + } + } + } + + public void run() { + while (shouldRun) { + try { + List<Task> tasks = transactionTemplate.execute(new TransactionCallback<List<Task>>() { + public List<Task> doInTransaction(TransactionStatus status) { + return taskDao.findByNameAndCompletedIsNull(queue.name); + } + }); + + log.info("Found {} tasks on queue {}", tasks.size(), queue.name); + + if(tasks.size() > 0) { + for (final Task task : tasks) { + try { + executeTask(task); + } catch (TransactionException | TaskFailureException e) { + log.warn("Task execution failed", e); + } + } + } + } catch (Throwable e) { + log.warn("Error while executing tasks.", e); + } + + synchronized (this) { + busy = false; + + if (checkForNewTasks) { + log.info("Ping received!"); + checkForNewTasks = false; + continue; + } + + try { + wait(); + } catch (InterruptedException e) { + // ignore + } + + busy = true; + } + } + } + + private void executeTask(final Task task) { + final Date run = new Date(); + log.info("Setting last run on task. date = {}, task = {}", run, task); + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + protected void doInTransactionWithoutResult(TransactionStatus status) { + taskDao.update(task.registerRun()); + } + }); + + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + protected void doInTransactionWithoutResult(TransactionStatus status) { + try { + callable.run(task.arguments); + Date completed = new Date(); + Task t = task.registerComplete(completed); + log.info("Completed task: {}", t); + taskDao.update(t); + } catch (Exception e) { + throw new TaskFailureException(e); + } + } + }); + } +} diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java index 2e407a5..2bf2145 100644 --- a/src/main/java/io/trygvis/queue/TaskDao.java +++ b/src/main/java/io/trygvis/queue/TaskDao.java @@ -3,12 +3,16 @@ package io.trygvis.queue; import org.springframework.beans.factory.annotation.*; import org.springframework.jdbc.core.*; import org.springframework.stereotype.*; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; import java.sql.*; import java.util.Date; import java.util.*; import static java.util.Arrays.*; +import static org.springframework.transaction.annotation.Propagation.MANDATORY; +import static org.springframework.transaction.annotation.Propagation.REQUIRED; @Component public class TaskDao { @@ -16,22 +20,26 @@ public class TaskDao { @Autowired private JdbcTemplate jdbcTemplate; + @Transactional(propagation = MANDATORY) public long insert(String queue, Date scheduled, String arguments) { jdbcTemplate.update("INSERT INTO task(id, run_count, queue, scheduled, arguments) " + "VALUES(nextval('task_seq'), 0, ?, ?, ?)", queue, scheduled, arguments); return jdbcTemplate.queryForObject("SELECT currval('task_seq')", Long.class); } + @Transactional(propagation = MANDATORY) public Task findById(long id) { return jdbcTemplate.queryForObject("SELECT " + TaskRowMapper.fields + " FROM task WHERE id=?", new TaskRowMapper(), id); } + @Transactional(propagation = MANDATORY) public List<Task> findByNameAndCompletedIsNull(String name) { return jdbcTemplate.query("SELECT " + TaskRowMapper.fields + " FROM task WHERE queue=? AND completed IS NULL", new TaskRowMapper(), name); } + @Transactional(propagation = MANDATORY) public void update(Task task) { jdbcTemplate.update("UPDATE task SET scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?", task.scheduled, task.lastRun, task.runCount, task.completed, task.id); @@ -41,6 +49,7 @@ public class TaskDao { public static final String fields = "id, queue, scheduled, last_run, run_count, completed, arguments"; public Task mapRow(ResultSet rs, int rowNum) throws SQLException { + String arguments = rs.getString(7); return new Task( rs.getLong(1), rs.getString(2), @@ -48,7 +57,7 @@ public class TaskDao { rs.getTimestamp(4), rs.getInt(5), rs.getTimestamp(6), - asList(rs.getString(7).split(" "))); + arguments != null ? asList(arguments.split(" ")) : Collections.<String>emptyList()); } } } diff --git a/src/main/java/io/trygvis/queue/TaskFailureException.java b/src/main/java/io/trygvis/queue/TaskFailureException.java new file mode 100644 index 0000000..d3d8c48 --- /dev/null +++ b/src/main/java/io/trygvis/queue/TaskFailureException.java @@ -0,0 +1,7 @@ +package io.trygvis.queue; + +class TaskFailureException extends RuntimeException { + public TaskFailureException(Exception e) { + super(e); + } +} diff --git a/src/main/java/io/trygvis/spring/Config.java b/src/main/java/io/trygvis/spring/Config.java index 5dd845f..ca75049 100755 --- a/src/main/java/io/trygvis/spring/Config.java +++ b/src/main/java/io/trygvis/spring/Config.java @@ -1,179 +1,174 @@ -package io.trygvis.spring;
-
-import com.jolbox.bonecp.*;
-import io.trygvis.model.*;
-import org.hibernate.*;
-import org.hibernate.annotations.*;
-import org.hibernate.cfg.*;
-import org.hibernate.ejb.*;
-import org.springframework.beans.factory.annotation.*;
-import org.springframework.context.annotation.*;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.context.support.*;
-import org.springframework.data.jpa.repository.config.*;
-import org.springframework.jdbc.core.*;
-import org.springframework.jdbc.datasource.*;
-import org.springframework.orm.hibernate4.*;
-import org.springframework.orm.jpa.*;
-import org.springframework.transaction.*;
-import org.springframework.transaction.annotation.*;
-import org.springframework.transaction.support.*;
-
-import javax.persistence.*;
-import javax.sql.*;
-import java.util.*;
-
-import static org.hibernate.cfg.AvailableSettings.*;
-import static org.hibernate.ejb.AvailableSettings.*;
-import static org.springframework.transaction.TransactionDefinition.*;
-
-@Configuration
-@ComponentScan(basePackages = "io.trygvis")
-@EnableTransactionManagement
-@EnableJpaRepositories(basePackages = "io.trygvis.data")
-public class Config {
-
- @Bean
- public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() throws Exception {
- return new PropertySourcesPlaceholderConfigurer() {{
-// setLocation(new UrlResource("file:environment.properties"));
- setProperties(System.getProperties());
- setLocalOverride(true);
- }};
- }
-
- @Bean
- public JdbcTemplate jdbcTemplate(DataSource dataSource) {
- return new JdbcTemplate(dataSource);
- }
-
-// public SpringBeanJobFactory springBeanJobFactory() {
-// SpringBeanJobFactory factory = new SpringBeanJobFactory();
-// return factory;
-// }
-
-/*
- @Bean
- public SchedulerFactoryBean quartz(DataSource dataSource, PlatformTransactionManager transactionManager) {
- SchedulerFactoryBean bean = new SchedulerFactoryBean();
- bean.setApplicationContextSchedulerContextKey("applicationContext");
- bean.setDataSource(dataSource);
- bean.setTransactionManager(transactionManager);
-// bean.setJobFactory(new JobFactory() {
-// public Job newJob(TriggerFiredBundle bundle, Scheduler scheduler) throws SchedulerException {
-// Class<? extends Job> klass = bundle.getJobDetail().getJobClass();
-// }
-// });
-
- Properties quartzProperties = new Properties();
- quartzProperties.setProperty("org.quartz.scheduler.skipUpdateCheck", "true");
-// quartzProperties.setProperty("org.quartz.jobStore.selectWithLockSQL", "false");
- quartzProperties.setProperty("org.quartz.jobStore.driverDelegateClass", PostgreSQLDelegate.class.getName());
- quartzProperties.setProperty("org.quartz.scheduler.jmx.export", "true");
- bean.setQuartzProperties(quartzProperties);
- return bean;
- }
-*/
-
- // This turns out to be fairly useless as Spring won't register them automatically.
- // It's probably better to use @Scheduled/@Async instead
- /*
- @Bean(name = "my-job")
- public JobDetailFactoryBean myJobDetailBean() {
- JobDetailFactoryBean bean = new JobDetailFactoryBean();
- bean.setJobClass(MyJob.class);
- bean.setDurability(true);
-
- return bean;
- }
-
- @Bean
- public CronTriggerFactoryBean myJobTrigger(JobDetail jobDetail) {
- CronTriggerFactoryBean bean = new CronTriggerFactoryBean();
- bean.setName("my-trigger");
- bean.setBeanName("my-job");
- bean.setJobDetail(jobDetail);
- bean.setCronExpression("0/10 * * * * ?");
-
- return bean;
- }
- */
-
- @Bean
- public DataSource dataSource(@Value("${database.url}") String jdbcUrl,
- @Value("${database.username}") String username,
- @Value("${database.password}") String password) {
- BoneCPDataSource ds = new BoneCPDataSource();
-
- ds.setLogStatementsEnabled(true);
-
- ds.setJdbcUrl(jdbcUrl);
- ds.setUsername(username);
- ds.setPassword(password);
-
- ds.setIdleConnectionTestPeriodInSeconds(60);
- ds.setIdleMaxAgeInSeconds(240);
- ds.setMaxConnectionsPerPartition(40);
- ds.setMinConnectionsPerPartition(0);
- ds.setPartitionCount(1);
- ds.setAcquireIncrement(1);
- ds.setStatementsCacheSize(1000);
- ds.setReleaseHelperThreads(3);
- ds.setStatisticsEnabled(true);
- return new TransactionAwareDataSourceProxy(new LazyConnectionDataSourceProxy(ds));
- }
-
- @Bean
- public LocalContainerEntityManagerFactoryBean entityManagerFactory(DataSource dataSource,
- @Value("${hibernate.hbm2ddl.auto}") String hbm2ddl,
- @Value("${hibernate.showSql:false}") boolean showSql,
- @Value("${hibernate.dialect}") String dialect) {
- LocalContainerEntityManagerFactoryBean x = new LocalContainerEntityManagerFactoryBean();
- x.setDataSource(dataSource);
- x.setJpaPropertyMap(createJpaMap(hbm2ddl, showSql, dialect));
- x.setPackagesToScan(Article.class.getPackage().getName());
- HibernatePersistence persistenceProvider = new HibernatePersistence();
- x.setPersistenceProvider(persistenceProvider);
- return x;
- }
-
- public static Map<String, Object> createJpaMap(String hbm2ddl, boolean showSql, String dialect) {
- Map<String, Object> map = new HashMap<>();
- map.put(HBM2DDL_AUTO, hbm2ddl);
- map.put(FORMAT_SQL, showSql);
- map.put(SHOW_SQL, showSql);
- map.put(USE_SQL_COMMENTS, showSql);
- map.put(GENERATE_STATISTICS, true);
- map.put(NAMING_STRATEGY, ImprovedNamingStrategy.class.getName());
-
- map.put(DEFAULT_CACHE_CONCURRENCY_STRATEGY, CacheConcurrencyStrategy.READ_WRITE.toString());
- map.put(CURRENT_SESSION_CONTEXT_CLASS, SpringSessionContext.class.getName());
-// map.put(CACHE_REGION_FACTORY, EhCacheRegionFactory.class.getName());
- map.put(USE_SECOND_LEVEL_CACHE, false);
- map.put(USE_QUERY_CACHE, false);
-// map.put(SHARED_CACHE_MODE, SharedCacheMode.ENABLE_SELECTIVE.toString());
-
- map.put(DIALECT, dialect);
- map.put("hibernate.temp.use_jdbc_metadata_defaults", "false");
-
- return map;
- }
-
- @Bean
- public SessionFactory sessionFactory(LocalContainerEntityManagerFactoryBean entityManagerFactory) {
- return ((HibernateEntityManagerFactory) entityManagerFactory.nativeEntityManagerFactory).getSessionFactory();
- }
-
- @Bean
- public JpaTransactionManager transactionManager(EntityManagerFactory entityManagerFactory) {
- return new JpaTransactionManager(entityManagerFactory);
- }
-
- @Bean
- public TransactionTemplate transactionTemplate(PlatformTransactionManager platformTransactionManager) {
- DefaultTransactionDefinition td = new DefaultTransactionDefinition();
- td.setPropagationBehavior(PROPAGATION_REQUIRED);
- td.setIsolationLevel(ISOLATION_READ_COMMITTED);
- return new TransactionTemplate(platformTransactionManager, td);
- }
-}
+package io.trygvis.spring; + +import com.jolbox.bonecp.BoneCPDataSource; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.support.PropertySourcesPlaceholderConfigurer; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy; +import org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy; +import org.springframework.orm.jpa.JpaTransactionManager; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.annotation.EnableTransactionManagement; +import org.springframework.transaction.support.TransactionTemplate; + +import javax.persistence.EntityManagerFactory; +import javax.sql.DataSource; + +@Configuration +@ComponentScan(basePackages = "io.trygvis") +@EnableTransactionManagement +//@EnableJpaRepositories(basePackages = "io.trygvis.data") +public class Config { + + @Bean + public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() throws Exception { + return new PropertySourcesPlaceholderConfigurer() {{ +// setLocation(new UrlResource("file:environment.properties")); + setProperties(System.getProperties()); + setLocalOverride(true); + }}; + } + + @Bean + public JdbcTemplate jdbcTemplate(DataSource dataSource) { + return new JdbcTemplate(dataSource); + } + +// public SpringBeanJobFactory springBeanJobFactory() { +// SpringBeanJobFactory factory = new SpringBeanJobFactory(); +// return factory; +// } + +/* + @Bean + public SchedulerFactoryBean quartz(DataSource dataSource, PlatformTransactionManager transactionManager) { + SchedulerFactoryBean bean = new SchedulerFactoryBean(); + bean.setApplicationContextSchedulerContextKey("applicationContext"); + bean.setDataSource(dataSource); + bean.setTransactionManager(transactionManager); +// bean.setJobFactory(new JobFactory() { +// public Job newJob(TriggerFiredBundle bundle, Scheduler scheduler) throws SchedulerException { +// Class<? extends Job> klass = bundle.getJobDetail().getJobClass(); +// } +// }); + + Properties quartzProperties = new Properties(); + quartzProperties.setProperty("org.quartz.scheduler.skipUpdateCheck", "true"); +// quartzProperties.setProperty("org.quartz.jobStore.selectWithLockSQL", "false"); + quartzProperties.setProperty("org.quartz.jobStore.driverDelegateClass", PostgreSQLDelegate.class.getName()); + quartzProperties.setProperty("org.quartz.scheduler.jmx.export", "true"); + bean.setQuartzProperties(quartzProperties); + return bean; + } +*/ + + // This turns out to be fairly useless as Spring won't register them automatically. + // It's probably better to use @Scheduled/@Async instead + /* + @Bean(name = "my-job") + public JobDetailFactoryBean myJobDetailBean() { + JobDetailFactoryBean bean = new JobDetailFactoryBean(); + bean.setJobClass(MyJob.class); + bean.setDurability(true); + + return bean; + } + + @Bean + public CronTriggerFactoryBean myJobTrigger(JobDetail jobDetail) { + CronTriggerFactoryBean bean = new CronTriggerFactoryBean(); + bean.setName("my-trigger"); + bean.setBeanName("my-job"); + bean.setJobDetail(jobDetail); + bean.setCronExpression("0/10 * * * * ?"); + + return bean; + } + */ + + @Bean + public DataSource dataSource(@Value("${database.url}") String jdbcUrl, + @Value("${database.username}") String username, + @Value("${database.password}") String password) { + BoneCPDataSource ds = new BoneCPDataSource(); + + ds.setLogStatementsEnabled(true); + + ds.setJdbcUrl(jdbcUrl); + ds.setUsername(username); + ds.setPassword(password); + + ds.setIdleConnectionTestPeriodInSeconds(60); + ds.setIdleMaxAgeInSeconds(240); + ds.setMaxConnectionsPerPartition(40); + ds.setMinConnectionsPerPartition(0); + ds.setPartitionCount(1); + ds.setAcquireIncrement(1); + ds.setStatementsCacheSize(1000); + ds.setReleaseHelperThreads(3); + ds.setStatisticsEnabled(true); + return new TransactionAwareDataSourceProxy(new LazyConnectionDataSourceProxy(ds)); + } + +/* + @Bean + public LocalContainerEntityManagerFactoryBean entityManagerFactory(DataSource dataSource, + @Value("${hibernate.hbm2ddl.auto}") String hbm2ddl, + @Value("${hibernate.showSql:false}") boolean showSql, + @Value("${hibernate.dialect}") String dialect) { + LocalContainerEntityManagerFactoryBean x = new LocalContainerEntityManagerFactoryBean(); + x.setDataSource(dataSource); + x.setJpaPropertyMap(createJpaMap(hbm2ddl, showSql, dialect)); + x.setPackagesToScan(Article.class.getPackage().getName()); + HibernatePersistence persistenceProvider = new HibernatePersistence(); + x.setPersistenceProvider(persistenceProvider); + return x; + } + + public static Map<String, Object> createJpaMap(String hbm2ddl, boolean showSql, String dialect) { + Map<String, Object> map = new HashMap<>(); + map.put(HBM2DDL_AUTO, hbm2ddl); + map.put(FORMAT_SQL, showSql); + map.put(SHOW_SQL, showSql); + map.put(USE_SQL_COMMENTS, showSql); + map.put(GENERATE_STATISTICS, true); + map.put(NAMING_STRATEGY, ImprovedNamingStrategy.class.getName()); + + map.put(DEFAULT_CACHE_CONCURRENCY_STRATEGY, CacheConcurrencyStrategy.READ_WRITE.toString()); + map.put(CURRENT_SESSION_CONTEXT_CLASS, SpringSessionContext.class.getName()); +// map.put(CACHE_REGION_FACTORY, EhCacheRegionFactory.class.getName()); + map.put(USE_SECOND_LEVEL_CACHE, false); + map.put(USE_QUERY_CACHE, false); +// map.put(SHARED_CACHE_MODE, SharedCacheMode.ENABLE_SELECTIVE.toString()); + + map.put(DIALECT, dialect); + map.put("hibernate.temp.use_jdbc_metadata_defaults", "false"); + + return map; + } + + @Bean + public SessionFactory sessionFactory(LocalContainerEntityManagerFactoryBean entityManagerFactory) { + return ((HibernateEntityManagerFactory) entityManagerFactory.nativeEntityManagerFactory).getSessionFactory(); + } + + @Bean + public JpaTransactionManager transactionManager(EntityManagerFactory entityManagerFactory) { + return new JpaTransactionManager(entityManagerFactory); + } +*/ + + @Bean + public PlatformTransactionManager transactionManager(DataSource dataSource) { + return new DataSourceTransactionManager(dataSource); + } + + @Bean + public TransactionTemplate transactionTemplate(PlatformTransactionManager platformTransactionManager) { + return new TransactionTemplate(platformTransactionManager); + } +} |