diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2013-04-15 19:40:00 +0200 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2013-04-15 19:40:00 +0200 |
commit | c274d9177e4a495e7b793120dfd1ce12fa5632c7 (patch) | |
tree | 99b556d99543d96a5f91959c7da875db7499b8ff /src/main | |
parent | 8477e8a888d584cf1352a4b69d7cefb2b7cd9ace (diff) | |
download | quartz-based-queue-c274d9177e4a495e7b793120dfd1ce12fa5632c7.tar.gz quartz-based-queue-c274d9177e4a495e7b793120dfd1ce12fa5632c7.tar.bz2 quartz-based-queue-c274d9177e4a495e7b793120dfd1ce12fa5632c7.tar.xz quartz-based-queue-c274d9177e4a495e7b793120dfd1ce12fa5632c7.zip |
o Using the returned references to wait for completion.
Diffstat (limited to 'src/main')
-rwxr-xr-x | src/main/java/io/trygvis/Main.java | 24 | ||||
-rwxr-xr-x | src/main/java/io/trygvis/model/Task.java | 9 | ||||
-rwxr-xr-x | src/main/java/io/trygvis/queue/AsyncService.java | 13 | ||||
-rwxr-xr-x | src/main/java/io/trygvis/queue/JpaAsyncService.java | 37 |
4 files changed, 75 insertions, 8 deletions
diff --git a/src/main/java/io/trygvis/Main.java b/src/main/java/io/trygvis/Main.java index 448c3e4..0a31aaa 100755 --- a/src/main/java/io/trygvis/Main.java +++ b/src/main/java/io/trygvis/Main.java @@ -11,6 +11,7 @@ import org.springframework.stereotype.*; import java.util.*;
import static java.lang.System.*;
+import static java.lang.Thread.*;
@Component
public class Main {
@@ -34,7 +35,7 @@ public class Main { try {
context.getBean(Main.class).run();
log.info("Sleeping");
- Thread.sleep(1000 * 1000);
+ sleep(1000 * 1000);
} catch (Exception e) {
e.printStackTrace(System.out);
}
@@ -71,5 +72,26 @@ public class Main { for (int i = 0; i < 10; i++) {
refs.add(asyncService.schedule(queue));
}
+
+ while (true) {
+ log.info("size = {}", refs.size());
+ for (Iterator<AsyncService.ExecutionRef> iterator = refs.iterator(); iterator.hasNext(); ) {
+ AsyncService.ExecutionRef ref = iterator.next();
+
+ ref = asyncService.update(ref);
+
+ log.info("ref = {}", ref);
+
+ if (ref.isDone()) {
+ iterator.remove();
+ }
+
+ sleep(100);
+ }
+
+ if (refs.isEmpty()) {
+ break;
+ }
+ }
}
}
diff --git a/src/main/java/io/trygvis/model/Task.java b/src/main/java/io/trygvis/model/Task.java index e86b623..24ac83d 100755 --- a/src/main/java/io/trygvis/model/Task.java +++ b/src/main/java/io/trygvis/model/Task.java @@ -9,7 +9,7 @@ public class Task { @Id
@SequenceGenerator(name = "task_seq", sequenceName = "task_seq")
@GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "task_seq")
- private Integer id;
+ private Long id;
@ManyToOne
private Queue queue;
@@ -44,7 +44,7 @@ public class Task { this.arguments = builder.toString();
}
- public Integer getId() {
+ public Long getId() {
return id;
}
@@ -68,8 +68,8 @@ public class Task { return completed;
}
- public void setCompleted(Date completed) {
- this.completed = completed;
+ public boolean isDone() {
+ return completed != null;
}
public void registerRun() {
@@ -87,6 +87,7 @@ public class Task { ", queue=" + queue +
", scheduled=" + scheduled +
", lastRun=" + lastRun +
+ ", runCount=" + runCount +
", completed=" + completed +
", arguments='" + arguments + '\'' +
'}';
diff --git a/src/main/java/io/trygvis/queue/AsyncService.java b/src/main/java/io/trygvis/queue/AsyncService.java index f792f5e..b08db1f 100755 --- a/src/main/java/io/trygvis/queue/AsyncService.java +++ b/src/main/java/io/trygvis/queue/AsyncService.java @@ -2,6 +2,8 @@ package io.trygvis.queue; import org.quartz.*;
+import java.util.*;
+
public interface AsyncService<QueueRef extends AsyncService.QueueRef, ExecutionRef extends AsyncService.ExecutionRef> {
JpaAsyncService.JpaQueueRef registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException;
@@ -10,10 +12,21 @@ public interface AsyncService<QueueRef extends AsyncService.QueueRef, ExecutionR ExecutionRef schedule(QueueRef queue, String... args);
+ ExecutionRef update(ExecutionRef ref);
+
interface QueueRef {
}
interface ExecutionRef {
+ List<String> getArguments();
+
+ Date getScheduled();
+
+ Date getLastRun();
+
+ Date getCompleted();
+
+ boolean isDone();
}
interface AsyncCallable {
diff --git a/src/main/java/io/trygvis/queue/JpaAsyncService.java b/src/main/java/io/trygvis/queue/JpaAsyncService.java index 95d5ef3..e715ac7 100755 --- a/src/main/java/io/trygvis/queue/JpaAsyncService.java +++ b/src/main/java/io/trygvis/queue/JpaAsyncService.java @@ -101,6 +101,11 @@ public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef return new JpaExecutionRef(task);
}
+ @Transactional(readOnly = true)
+ public JpaExecutionRef update(JpaExecutionRef ref) {
+ return new JpaExecutionRef(taskRepository.findOne(ref.task.getId()));
+ }
+
public static class JpaQueueRef implements AsyncService.QueueRef {
public final Queue queue;
@@ -122,6 +127,26 @@ public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef this.task = task;
}
+ public List<String> getArguments() {
+ return Arrays.asList(task.getArguments());
+ }
+
+ public Date getScheduled() {
+ return task.getScheduled();
+ }
+
+ public Date getLastRun() {
+ return task.getLastRun();
+ }
+
+ public Date getCompleted() {
+ return task.getCompleted();
+ }
+
+ public boolean isDone() {
+ return task.isDone();
+ }
+
public String toString() {
return "JpaExecutionRef{" +
"task=" + task +
@@ -129,6 +154,12 @@ public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef }
}
+ private static class TaskFailureException extends RuntimeException {
+ public TaskFailureException(Exception e) {
+ super(e);
+ }
+ }
+
private class CheckTimerTask implements Runnable {
private final AsyncCallable callable;
private final JpaQueueRef queueRef;
@@ -149,12 +180,12 @@ public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef for (final Task task : tasks) {
try {
executeTask(task);
- } catch (TransactionException e) {
+ } catch (TransactionException | TaskFailureException e) {
log.warn("Task execution failed", e);
}
}
} catch (Exception e) {
- log.warn("Error while execution tasks.", e);
+ log.warn("Error while executing tasks.", e);
}
}
@@ -177,7 +208,7 @@ public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef task.registerComplete(completed);
taskRepository.save(task);
} catch (Exception e) {
- throw new RuntimeException("Error while executing callback", e);
+ throw new TaskFailureException(e);
}
}
});
|