aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/queue/JdbcAsyncService.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/queue/JdbcAsyncService.java')
-rw-r--r--src/main/java/io/trygvis/queue/JdbcAsyncService.java60
1 files changed, 40 insertions, 20 deletions
diff --git a/src/main/java/io/trygvis/queue/JdbcAsyncService.java b/src/main/java/io/trygvis/queue/JdbcAsyncService.java
index a8f581e..06e7eee 100644
--- a/src/main/java/io/trygvis/queue/JdbcAsyncService.java
+++ b/src/main/java/io/trygvis/queue/JdbcAsyncService.java
@@ -1,19 +1,27 @@
package io.trygvis.queue;
-import org.quartz.*;
-import org.slf4j.*;
-import org.springframework.beans.factory.annotation.*;
-import org.springframework.stereotype.*;
-import org.springframework.transaction.annotation.*;
-import org.springframework.transaction.support.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-import static java.util.Arrays.*;
-import static java.util.concurrent.TimeUnit.*;
-import static org.springframework.transaction.annotation.Propagation.*;
-import static org.springframework.transaction.support.TransactionSynchronizationManager.*;
+import org.quartz.SchedulerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
+import org.springframework.transaction.support.TransactionSynchronization;
+import org.springframework.transaction.support.TransactionSynchronizationAdapter;
+import org.springframework.transaction.support.TransactionTemplate;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import static java.lang.System.currentTimeMillis;
+import static java.lang.Thread.sleep;
+import static java.util.Arrays.asList;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.springframework.transaction.annotation.Propagation.REQUIRED;
+import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization;
@Component
public class JdbcAsyncService implements AsyncService {
@@ -96,12 +104,6 @@ public class JdbcAsyncService implements AsyncService {
long id = taskDao.insert(queue.name, scheduled, arguments.toString());
Task task = new Task(id, queue.name, scheduled, null, 0, null, asList(args));
log.info("Created task = {}", task);
-// queues.get(queue.name).ping();
-// try {
-// Thread.sleep(500);
-// } catch (InterruptedException e) {
-// e.printStackTrace();
-// }
registerSynchronization(new TransactionSynchronizationAdapter() {
public void afterCompletion(int status) {
@@ -114,6 +116,24 @@ public class JdbcAsyncService implements AsyncService {
return task;
}
+ @Transactional
+ public Task await(Task task, long timeout) {
+ final long start = currentTimeMillis();
+ final long end = start + timeout;
+
+ while (currentTimeMillis() < end) {
+ task = update(task);
+
+ try {
+ sleep(100);
+ } catch (InterruptedException e) {
+ // break
+ }
+ }
+
+ return task;
+ }
+
@Transactional(readOnly = true)
public Task update(Task ref) {
return taskDao.findById(ref.id);