aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xsrc/main/java/io/trygvis/queue/AsyncService.java2
-rw-r--r--src/main/java/io/trygvis/queue/JdbcAsyncService.java12
-rwxr-xr-xsrc/main/java/io/trygvis/queue/Task.java10
-rw-r--r--src/main/java/io/trygvis/queue/TaskDao.java23
-rw-r--r--src/main/resources/create.sql6
5 files changed, 37 insertions, 16 deletions
diff --git a/src/main/java/io/trygvis/queue/AsyncService.java b/src/main/java/io/trygvis/queue/AsyncService.java
index 41bb75a..c9e5861 100755
--- a/src/main/java/io/trygvis/queue/AsyncService.java
+++ b/src/main/java/io/trygvis/queue/AsyncService.java
@@ -19,6 +19,8 @@ public interface AsyncService {
Task schedule(Queue queue, String... args);
+ Task schedule(long parent, Queue queue, String... args);
+
/**
* Polls for a new state of the execution.
*/
diff --git a/src/main/java/io/trygvis/queue/JdbcAsyncService.java b/src/main/java/io/trygvis/queue/JdbcAsyncService.java
index 06e7eee..276541f 100644
--- a/src/main/java/io/trygvis/queue/JdbcAsyncService.java
+++ b/src/main/java/io/trygvis/queue/JdbcAsyncService.java
@@ -94,6 +94,14 @@ public class JdbcAsyncService implements AsyncService {
@Transactional(propagation = REQUIRED)
public Task schedule(final Queue queue, String... args) {
+ return scheduleInner(null, queue, args);
+ }
+
+ public Task schedule(long parent, Queue queue, String... args) {
+ return scheduleInner(parent, queue, args);
+ }
+
+ private Task scheduleInner(Long parent, final Queue queue, String... args) {
Date scheduled = new Date();
StringBuilder arguments = new StringBuilder();
@@ -101,8 +109,8 @@ public class JdbcAsyncService implements AsyncService {
arguments.append(arg).append(' ');
}
- long id = taskDao.insert(queue.name, scheduled, arguments.toString());
- Task task = new Task(id, queue.name, scheduled, null, 0, null, asList(args));
+ long id = taskDao.insert(parent, queue.name, scheduled, arguments.toString());
+ Task task = new Task(parent, id, queue.name, scheduled, null, 0, null, asList(args));
log.info("Created task = {}", task);
registerSynchronization(new TransactionSynchronizationAdapter() {
diff --git a/src/main/java/io/trygvis/queue/Task.java b/src/main/java/io/trygvis/queue/Task.java
index 9a2e65b..09d5060 100755
--- a/src/main/java/io/trygvis/queue/Task.java
+++ b/src/main/java/io/trygvis/queue/Task.java
@@ -7,6 +7,8 @@ public class Task {
public final long id;
+ public final Long parent;
+
public final String queue;
public final Date scheduled;
@@ -19,8 +21,9 @@ public class Task {
public final List<String> arguments;
- Task(long id, String queue, Date scheduled, Date lastRun, int runCount, Date completed, List<String> arguments) {
+ Task(long id, Long parent, String queue, Date scheduled, Date lastRun, int runCount, Date completed, List<String> arguments) {
this.id = id;
+ this.parent = parent;
this.queue = queue;
this.scheduled = scheduled;
this.lastRun = lastRun;
@@ -31,16 +34,17 @@ public class Task {
}
public Task registerRun() {
- return new Task(id, queue, scheduled, new Date(), runCount + 1, completed, arguments);
+ return new Task(id, parent, queue, scheduled, new Date(), runCount + 1, completed, arguments);
}
public Task registerComplete(Date completed) {
- return new Task(id, queue, scheduled, lastRun, runCount, completed, arguments);
+ return new Task(id, parent, queue, scheduled, lastRun, runCount, completed, arguments);
}
public String toString() {
return "Task{" +
"id=" + id +
+ ", parent=" + parent +
", queue=" + queue +
", scheduled=" + scheduled +
", lastRun=" + lastRun +
diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java
index dac99c7..a59dcbb 100644
--- a/src/main/java/io/trygvis/queue/TaskDao.java
+++ b/src/main/java/io/trygvis/queue/TaskDao.java
@@ -21,10 +21,14 @@ public class TaskDao {
@Autowired
private JdbcTemplate jdbcTemplate;
- @Transactional(propagation = MANDATORY)
public long insert(String queue, Date scheduled, String arguments) {
- jdbcTemplate.update("INSERT INTO task(id, run_count, queue, scheduled, arguments) " +
- "VALUES(nextval('task_seq'), 0, ?, ?, ?)", queue, scheduled, arguments);
+ return this.insert(null, queue, scheduled, arguments);
+ }
+
+ @Transactional(propagation = MANDATORY)
+ public long insert(Long parent, String queue, Date scheduled, String arguments) {
+ jdbcTemplate.update("INSERT INTO task(id, parent, run_count, queue, scheduled, arguments) " +
+ "VALUES(nextval('task_seq'), ?, 0, ?, ?, ?)", parent, queue, scheduled, arguments);
return jdbcTemplate.queryForObject("SELECT currval('task_seq')", Long.class);
}
@@ -47,17 +51,18 @@ public class TaskDao {
}
private class TaskRowMapper implements RowMapper<Task> {
- public static final String fields = "id, queue, scheduled, last_run, run_count, completed, arguments";
+ public static final String fields = "id, parent, queue, scheduled, last_run, run_count, completed, arguments";
public Task mapRow(ResultSet rs, int rowNum) throws SQLException {
- String arguments = rs.getString(7);
+ String arguments = rs.getString(8);
return new Task(
rs.getLong(1),
- rs.getString(2),
- rs.getTimestamp(3),
+ rs.getLong(2),
+ rs.getString(3),
rs.getTimestamp(4),
- rs.getInt(5),
- rs.getTimestamp(6),
+ rs.getTimestamp(5),
+ rs.getInt(6),
+ rs.getTimestamp(7),
arguments != null ? asList(arguments.split(" ")) : Collections.<String>emptyList());
}
}
diff --git a/src/main/resources/create.sql b/src/main/resources/create.sql
index ed8913f..f7f2939 100644
--- a/src/main/resources/create.sql
+++ b/src/main/resources/create.sql
@@ -11,7 +11,8 @@ CREATE TABLE queue (
);
CREATE TABLE task (
- id INTEGER NOT NULL,
+ id BIGINT NOT NULL,
+ parent BIGINT NOT NULL,
queue VARCHAR(100) NOT NULL,
scheduled TIMESTAMP NOT NULL,
last_run TIMESTAMP,
@@ -19,7 +20,8 @@ CREATE TABLE task (
completed TIMESTAMP,
arguments VARCHAR(100),
CONSTRAINT pk_task PRIMARY KEY (id),
- CONSTRAINT fk_task__queue FOREIGN KEY (queue) REFERENCES queue (name)
+ CONSTRAINT fk_task__queue FOREIGN KEY (queue) REFERENCES queue (name),
+ CONSTRAINT fk_task__parent FOREIGN KEY (parent) REFERENCES task (id)
);
CREATE SEQUENCE task_id;