aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2013-04-15 19:40:00 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2013-04-15 19:40:00 +0200
commitc274d9177e4a495e7b793120dfd1ce12fa5632c7 (patch)
tree99b556d99543d96a5f91959c7da875db7499b8ff
parent8477e8a888d584cf1352a4b69d7cefb2b7cd9ace (diff)
downloadquartz-based-queue-c274d9177e4a495e7b793120dfd1ce12fa5632c7.tar.gz
quartz-based-queue-c274d9177e4a495e7b793120dfd1ce12fa5632c7.tar.bz2
quartz-based-queue-c274d9177e4a495e7b793120dfd1ce12fa5632c7.tar.xz
quartz-based-queue-c274d9177e4a495e7b793120dfd1ce12fa5632c7.zip
o Using the returned references to wait for completion.
-rwxr-xr-xsrc/main/java/io/trygvis/Main.java24
-rwxr-xr-xsrc/main/java/io/trygvis/model/Task.java9
-rwxr-xr-xsrc/main/java/io/trygvis/queue/AsyncService.java13
-rwxr-xr-xsrc/main/java/io/trygvis/queue/JpaAsyncService.java37
4 files changed, 75 insertions, 8 deletions
diff --git a/src/main/java/io/trygvis/Main.java b/src/main/java/io/trygvis/Main.java
index 448c3e4..0a31aaa 100755
--- a/src/main/java/io/trygvis/Main.java
+++ b/src/main/java/io/trygvis/Main.java
@@ -11,6 +11,7 @@ import org.springframework.stereotype.*;
import java.util.*;
import static java.lang.System.*;
+import static java.lang.Thread.*;
@Component
public class Main {
@@ -34,7 +35,7 @@ public class Main {
try {
context.getBean(Main.class).run();
log.info("Sleeping");
- Thread.sleep(1000 * 1000);
+ sleep(1000 * 1000);
} catch (Exception e) {
e.printStackTrace(System.out);
}
@@ -71,5 +72,26 @@ public class Main {
for (int i = 0; i < 10; i++) {
refs.add(asyncService.schedule(queue));
}
+
+ while (true) {
+ log.info("size = {}", refs.size());
+ for (Iterator<AsyncService.ExecutionRef> iterator = refs.iterator(); iterator.hasNext(); ) {
+ AsyncService.ExecutionRef ref = iterator.next();
+
+ ref = asyncService.update(ref);
+
+ log.info("ref = {}", ref);
+
+ if (ref.isDone()) {
+ iterator.remove();
+ }
+
+ sleep(100);
+ }
+
+ if (refs.isEmpty()) {
+ break;
+ }
+ }
}
}
diff --git a/src/main/java/io/trygvis/model/Task.java b/src/main/java/io/trygvis/model/Task.java
index e86b623..24ac83d 100755
--- a/src/main/java/io/trygvis/model/Task.java
+++ b/src/main/java/io/trygvis/model/Task.java
@@ -9,7 +9,7 @@ public class Task {
@Id
@SequenceGenerator(name = "task_seq", sequenceName = "task_seq")
@GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "task_seq")
- private Integer id;
+ private Long id;
@ManyToOne
private Queue queue;
@@ -44,7 +44,7 @@ public class Task {
this.arguments = builder.toString();
}
- public Integer getId() {
+ public Long getId() {
return id;
}
@@ -68,8 +68,8 @@ public class Task {
return completed;
}
- public void setCompleted(Date completed) {
- this.completed = completed;
+ public boolean isDone() {
+ return completed != null;
}
public void registerRun() {
@@ -87,6 +87,7 @@ public class Task {
", queue=" + queue +
", scheduled=" + scheduled +
", lastRun=" + lastRun +
+ ", runCount=" + runCount +
", completed=" + completed +
", arguments='" + arguments + '\'' +
'}';
diff --git a/src/main/java/io/trygvis/queue/AsyncService.java b/src/main/java/io/trygvis/queue/AsyncService.java
index f792f5e..b08db1f 100755
--- a/src/main/java/io/trygvis/queue/AsyncService.java
+++ b/src/main/java/io/trygvis/queue/AsyncService.java
@@ -2,6 +2,8 @@ package io.trygvis.queue;
import org.quartz.*;
+import java.util.*;
+
public interface AsyncService<QueueRef extends AsyncService.QueueRef, ExecutionRef extends AsyncService.ExecutionRef> {
JpaAsyncService.JpaQueueRef registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException;
@@ -10,10 +12,21 @@ public interface AsyncService<QueueRef extends AsyncService.QueueRef, ExecutionR
ExecutionRef schedule(QueueRef queue, String... args);
+ ExecutionRef update(ExecutionRef ref);
+
interface QueueRef {
}
interface ExecutionRef {
+ List<String> getArguments();
+
+ Date getScheduled();
+
+ Date getLastRun();
+
+ Date getCompleted();
+
+ boolean isDone();
}
interface AsyncCallable {
diff --git a/src/main/java/io/trygvis/queue/JpaAsyncService.java b/src/main/java/io/trygvis/queue/JpaAsyncService.java
index 95d5ef3..e715ac7 100755
--- a/src/main/java/io/trygvis/queue/JpaAsyncService.java
+++ b/src/main/java/io/trygvis/queue/JpaAsyncService.java
@@ -101,6 +101,11 @@ public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef
return new JpaExecutionRef(task);
}
+ @Transactional(readOnly = true)
+ public JpaExecutionRef update(JpaExecutionRef ref) {
+ return new JpaExecutionRef(taskRepository.findOne(ref.task.getId()));
+ }
+
public static class JpaQueueRef implements AsyncService.QueueRef {
public final Queue queue;
@@ -122,6 +127,26 @@ public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef
this.task = task;
}
+ public List<String> getArguments() {
+ return Arrays.asList(task.getArguments());
+ }
+
+ public Date getScheduled() {
+ return task.getScheduled();
+ }
+
+ public Date getLastRun() {
+ return task.getLastRun();
+ }
+
+ public Date getCompleted() {
+ return task.getCompleted();
+ }
+
+ public boolean isDone() {
+ return task.isDone();
+ }
+
public String toString() {
return "JpaExecutionRef{" +
"task=" + task +
@@ -129,6 +154,12 @@ public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef
}
}
+ private static class TaskFailureException extends RuntimeException {
+ public TaskFailureException(Exception e) {
+ super(e);
+ }
+ }
+
private class CheckTimerTask implements Runnable {
private final AsyncCallable callable;
private final JpaQueueRef queueRef;
@@ -149,12 +180,12 @@ public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef
for (final Task task : tasks) {
try {
executeTask(task);
- } catch (TransactionException e) {
+ } catch (TransactionException | TaskFailureException e) {
log.warn("Task execution failed", e);
}
}
} catch (Exception e) {
- log.warn("Error while execution tasks.", e);
+ log.warn("Error while executing tasks.", e);
}
}
@@ -177,7 +208,7 @@ public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef
task.registerComplete(completed);
taskRepository.save(task);
} catch (Exception e) {
- throw new RuntimeException("Error while executing callback", e);
+ throw new TaskFailureException(e);
}
}
});